Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Databricks raadt aan om autolader in Lakeflow Spark-declaratieve pijplijnen te gebruiken voor incrementele gegevensopname. Lakeflow Spark-declaratieve pijplijnen breidt de functionaliteit uit in Apache Spark Structured Streaming en stelt u in staat om slechts een paar regels declaratieve Python of SQL te schrijven om een gegevenspijplijn van productiekwaliteit te implementeren met:
- Berekeningsinfrastructuur automatisch schalen voor kostenbesparingen:Het clustergebruik van Lakeflow Spark-declaratieve pijplijnen optimaliseren met automatisch schalen
- Gegevenskwaliteitscontroles met verwachtingen:Gegevenskwaliteit beheren met pijplijnwachtingen
- Automatische verwerking van schemaontwikkeling:Schemadeductie en evolutie configureren in AutoLoader
- Bewaking via metrische gegevens in het gebeurtenislogboek:Pijplijn-gebeurtenislogboek
Databricks raadt u ook aan om de aanbevolen procedures voor streaming te volgen bij het gebruik van Auto Loader in productie. Zie Overwegingen voor productie voor gestructureerd streamen.
Moniteren van automatisch laadapparaat
In de volgende secties wordt beschreven hoe u automatisch laadprogramma's in productie kunt bewaken, waaronder metrische gegevens, logboeken, waarschuwingen en algemene werkstromen voor probleemoplossing.
Query's uitvoeren op bestanden die zijn gedetecteerd door Automatisch Laden
Auto Loader biedt een SQL-API voor het inspecteren van de status van een stream. Met behulp van de cloud_files_state functie kunt u metagegevens vinden over bestanden die zijn gedetecteerd door een Auto Loader stream. Query cloud_files_state, waarbij de controlepuntlocatie wordt opgegeven die is gekoppeld aan een stroom voor automatisch laden.
Note
De cloud_files_state functie is beschikbaar in Databricks Runtime 11.3 LTS en hoger.
SELECT * FROM cloud_files_state('path/to/checkpoint');
Luister naar stream-updates
Databricks raadt aan om de Streaming Query Listener-interface van Apache Spark te gebruiken om Auto Loader-streams verder te bewaken. Zie Monitoring van Structured Streaming-queries op Azure Databricks.
Auto Loader rapporteert metrische gegevens aan de streamingquerylister in elke batch. U kunt zien hoeveel bestanden er in de achterstand staan en hoe groot de achterstand is in de numFilesOutstanding en numBytesOutstanding-metingen onder het tabblad Onbewerkte gegevens in het voortgangsdashboard van streamingquery's.
{
"sources": [
{
"description": "CloudFilesSource[/path/to/source]",
"metrics": {
"numFilesOutstanding": "238",
"numBytesOutstanding": "163939124006"
}
}
]
}
Wanneer u de modus voor bestandsmeldingen gebruikt in Databricks Runtime 10.4 LTS en hoger, bevatten de metrische gegevens ook het geschatte aantal bestandsevenementen in de cloudwachtrij als approximateQueueSize voor AWS en Azure.
Kostenoverwegingen
Bij het uitvoeren van automatisch laden zijn uw belangrijkste kostenbronnen rekenresources en bestandsdetectie.
Om de rekenkosten te verlagen, raadt Databricks aan om Lakeflow-taken te gebruiken om Automatisch laden te plannen als batchtaken Trigger.AvailableNow in plaats van deze continu uit te voeren zolang u geen lage latentievereisten hebt. Zie Triggerintervallen voor Gestructureerd Streamen configureren. Deze batchtaken kunnen worden geactiveerd met behulp van triggers voor bestands aankomst om de latentie tussen de aankomst en verwerking van bestanden verder te verlagen.
De kosten voor bestandsdetectie kunnen komen in de vorm van LIST bewerkingen op uw opslagaccounts in directoryvermeldingmodus en API-verzoeken op de abonnementsservice en wachtrijservice in bestandsnotificatiemodus. Databricks raadt het volgende aan om de kosten voor bestandsdetectie te verlagen:
- Gebruik de
ProcessingTimeofContinuoustriggers niet wanneer automatisch laden continu wordt uitgevoerd in de lijstweergavemodus. Gebruik in plaats daarvan de modus voor bestandsmeldingen met bestandsevenementen. Zie het overzicht van AutoLoader met bestandsevenementen voor meer informatie over de werking van automatisch laden met bestandsevenementen. - Gebruik de klassieke modus voor bestandsmeldingen als u automatisch laden niet kunt gebruiken met bestandsevenementen. In deze modus kunt u resources taggen die door Auto Loader zijn gemaakt om uw kosten bij te houden met behulp van resourcetags.
Brongegevensretentie
Note
Beschikbaar in Databricks Runtime 16.4 LTS en hoger.
Naarmate bestanden zich in uw bronmap verzamelen, nemen de opslagkosten toe en vertraagt de detectie van bestanden, met name in de modus voor het weergeven van mappen. Automatisch laden biedt de cloudFiles.cleanSource mogelijkheid om bestandsretentie automatisch te beheren door bestanden te archiveren of te verwijderen nadat ze zijn verwerkt.
Bestanden in de bronmap archiveren om de kosten te verlagen
Warning
- Instellen van
cloudFiles.cleanSourceverwijdert of verplaatst bestanden in de bronmap. - Als u gebruik maakt van
foreachBatchvoor de gegevensverwerking, worden uw bestanden verplaats- of verwijderkandidaten zodra uwforeachBatch-bewerking succesvol is afgerond, zelfs als uw bewerking alleen een subset van de bestanden in de batch heeft verwerkt.
Databricks raadt het gebruik van autolaadprogramma's aan met bestandsevenementen om de detectiekosten te verlagen. Dit vermindert ook de rekenkosten omdat detectie incrementeel is.
Als u geen bestandsevenementen kunt gebruiken en mappenlijstvermelding moet gebruiken om bestanden te ontdekken, kunt u de cloudFiles.cleanSource optie gebruiken om bestanden automatisch te archiveren of te verwijderen nadat Auto Loader ze heeft verwerkt, om de detectiekosten te verlagen. Doordat Auto Loader na de verwerking bestanden opschoont uit uw bronmap, hoeven er tijdens de detectie minder bestanden te worden vermeld.
Houd bij gebruik cloudFiles.cleanSource met de MOVE optie rekening met de volgende vereisten:
- Zowel de bronmap als de doelmap voor verplaatsen moeten zich in dezelfde bucket of container bevinden. Verplaatsingen tussen verschillende buckets en containers worden niet ondersteund en resulteren in een fout.
- De verplaatsingsbestemming kan een volumepad zijn (bijvoorbeeld
/Volumes/my_catalog/my_schema/my_volume/archive/). - Als uw bron- en doelmap zich op dezelfde externe locatie bevinden, moeten ze geen naastliggende mappen hebben die beheerde opslag bevatten (bijvoorbeeld een beheerd volume of catalogus). In dergelijke gevallen kan Auto Loader niet de noodzakelijke machtigingen krijgen om naar de doelmap te schrijven.
Databricks raadt aan deze optie te gebruiken wanneer:
- Uw bronmap verzamelt een groot aantal bestanden in de loop van de tijd.
- U moet verwerkte bestanden bewaren voor naleving of controle (ingesteld op
cloudFiles.cleanSourceMOVE). - U wilt de opslagkosten verlagen door bestanden te verwijderen na opname (ingesteld
cloudFiles.cleanSourceopDELETE). Wanneer u deDELETEmodus gebruikt, raadt Databricks aan om versiebeheer op de bucket in te schakelen, zodat Auto Loader optreedt als soft-deletes en beschikbaar is in geval van misconfiguraties. Bovendien raadt Databricks aan om cloudlevenscyclusbeleid in te stellen om oude, tijdelijk verwijderde versies na een opgegeven uitstelperiode (zoals 60 of 90 dagen) te verwijderen, gebaseerd op uw herstelvereisten.
Zie cleanSource voor de volledige naslaginformatie over opties en hun standaardinstellingen.
Verwerkte bestanden verplaatsen naar een cold storage-pad
In het volgende voorbeeld wordt Automatisch laden geconfigureerd voor het verplaatsen van verwerkte bestanden naar een archiefmap binnen dezelfde bucket na 14 dagen. U kunt een levenscyclusbeleid voor de cloud toepassen op het archiefpad om bestanden over te brengen naar goedkopere opslaglagen (bijvoorbeeld AWS S3 Glacier, Azure Cool/Archive of GCS Coldline/Archive).
Python
# Step 1: Configure Auto Loader to move processed files to an archive path.
checkpoint = "/Volumes/my_catalog/my_schema/my_volume/checkpoints/ingest_stream"
archive_path = "s3://my-bucket/archive/landing/"
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.cleanSource", "MOVE")
.option("cloudFiles.cleanSource.moveDestination", archive_path)
.option("cloudFiles.cleanSource.retentionDuration", "14 days")
.option("cloudFiles.schemaLocation", checkpoint)
.load("s3://my-bucket/landing/")
)
# Step 2: Write to a Delta table.
(df.writeStream
.option("checkpointLocation", checkpoint)
.trigger(availableNow=True)
.toTable("my_catalog.my_schema.raw_events")
)
# Step 3 (outside Databricks): Set up a cloud lifecycle policy on the
# archive path to transition files to cold storage after a grace period.
# For example, in AWS you can configure an S3 Lifecycle rule to move
# objects under s3://my-bucket/archive/landing/ to S3 Glacier after
# 30 days.
SQL
-- Step 1: Configure Auto Loader to move processed files to an archive path
-- using a Lakeflow Declarative Pipeline.
CREATE OR REFRESH STREAMING TABLE raw_events
AS SELECT * FROM STREAM read_files(
's3://my-bucket/landing/',
format => 'json',
cleanSource => 'MOVE',
cleanSourceMoveDestination => 's3://my-bucket/archive/landing/',
cleanSourceRetentionDuration => '14 days'
);
-- Step 2 (outside Databricks): Set up a cloud lifecycle policy on the
-- archive path to transition files to cold storage.
-- For example, in AWS configure an S3 Lifecycle rule to move objects
-- under s3://my-bucket/archive/landing/ to S3 Glacier after 30 days.
Trigger.AvailableNow en snelheidsbeperking gebruiken
Note
Beschikbaar in Databricks Runtime 10.4 LTS en hoger.
"Auto Loader kan worden gepland om als batchtaak in Lakeflow-taken te draaien met behulp van Trigger.AvailableNow." Met AvailableNow de trigger wordt Auto Loader geïnstrueerd om alle bestanden te verwerken die vóór de begintijd van de query zijn aangekomen. Nieuwe bestanden die binnenkomen nadat de stream is gestart, worden genegeerd tot de volgende trigger.
Met Trigger.AvailableNow vindt bestandsdetectie asynchroon plaats terwijl gegevens worden verwerkt over meerdere microbatches met een snelheidslimiet. Automatisch laden verwerkt standaard maximaal 1000 bestanden elke microbatch. U kunt cloudFiles.maxFilesPerTrigger en cloudFiles.maxBytesPerTrigger configureren om te bepalen hoeveel bestanden of bytes moeten worden verwerkt in een microbatch. De bestandslimiet is een vaste limiet, maar de bytelimiet is een zachte limiet, wat betekent dat meer bytes kunnen worden verwerkt dan de opgegeven maxBytesPerTrigger. Wanneer de opties beide samen worden geleverd, verwerkt Auto Loader zoveel bestanden die nodig zijn om een van de limieten te bereiken.
Locatie van controlepunt
De locatie van het controlepunt wordt gebruikt om de status- en voortgangsgegevens van de stream op te slaan. Databricks raadt aan om de controlepuntlocatie in te stellen op een locatie zonder levenscyclusbeleid voor cloudobjecten. Als bestanden op de controlepuntlocatie worden opgeschoond volgens het beleid, raakt de gegevensstroom verstoord. Als dit gebeurt, moet u de stream helemaal opnieuw starten.
Tracering van bestandsevenementen
Auto Loader volgt de gedetecteerde bestanden op de controlepuntlocatie met behulp van RocksDB om exact-once-verwerkingsgaranties te bieden. Databricks raadt u aan een upgrade uit te voeren naar Databricks Runtime 15.4 LTS of hoger voor grote volumes of langdurige opnamestromen. In deze versies wacht Auto Loader niet totdat de volledige RocksDB-status is gedownload voordat de stream wordt gestart, waardoor de opstarttijd van de stream kan worden versneld.
Als u wilt voorkomen dat de bestandsstatussen zonder limieten groeien, kunt u ook overwegen om cloudFiles.maxFileAge bestandsevenementen te laten verlopen die ouder zijn dan een bepaalde leeftijd. De minimumwaarde die u voor cloudFiles.maxFileAge kunt instellen, is "14 days". Verwijderingen in RocksDB worden weergegeven als tombstone-vermeldingen. Daarom kunt u zien dat het opslaggebruik tijdelijk kan toenemen wanneer gebeurtenissen verlopen voordat het begint te stabiliseren.
Warning
cloudFiles.maxFileAge wordt geleverd als kostenbeheermechanisme voor gegevenssets met een hoog volume. Het afstemmen cloudFiles.maxFileAge kan te agressief leiden tot problemen met gegevenskwaliteit, zoals dubbele opname of ontbrekende bestanden. Daarom raadt Databricks een conservatieve instelling aan voor cloudFiles.maxFileAge, zoals 90 dagen, die vergelijkbaar is met wat vergelijkbare oplossingen voor gegevensopname aanbevelen.
Als u de cloudFiles.maxFileAge optie probeert af te stemmen, kan dit ertoe leiden dat niet-verwerkte bestanden worden genegeerd door automatisch laden of al verwerkte bestanden verlopen en vervolgens opnieuw worden verwerkt, waardoor dubbele gegevens worden veroorzaakt. Hier volgen enkele aandachtspunten bij het kiezen van een cloudFiles.maxFileAge:
- Als uw stream na lange tijd opnieuw wordt opgestart, worden bestandsmeldingsgebeurtenissen die zijn opgehaald uit de wachtrij die ouder zijn dan
cloudFiles.maxFileAgegenegeerd. Als u een directorylijst gebruikt, worden bestanden die mogelijk zijn verschenen tijdens de storingstijd en ouder zijn dancloudFiles.maxFileAge, genegeerd. - Als u de modus voor het weergeven van mappen gebruikt en
cloudFiles.maxFileAgegebruikt, bijvoorbeeld ingesteld op"1 month", stopt u de stream en start u de stream opnieuw op metcloudFiles.maxFileAgeingesteld op"2 months", bestanden die ouder zijn dan 1 maand, maar recenter dan 2 maanden opnieuw worden verwerkt.
Als u deze optie de eerste keer instelt dat u de stream start, neemt u geen gegevens op die ouder zijn dan cloudFiles.maxFileAge, dus als u oude gegevens wilt opnemen, moet u deze optie niet instellen wanneer u de stream voor de eerste keer start. U moet deze optie echter instellen voor volgende uitvoeringen.
Reguliere backfills activeren met behulp van cloudFiles.backfillInterval
In zeldzame gevallen kunnen bestanden worden gemist of te laat wanneer ze alleen afhankelijk zijn van meldingssystemen, zoals wanneer de bewaarlimieten voor meldingen worden bereikt. Als u strikte vereisten hebt voor de volledigheid van gegevens en SLA, kunt u overwegen om cloudFiles.backfillInterval in te stellen om asynchrone backfills op een bepaald interval te activeren. Stel deze bijvoorbeeld in op één dag voor dagelijkse backfills of één week voor wekelijkse backfills. Het activeren van reguliere backfills veroorzaakt geen duplicaten.
Wanneer u bestandsevenementen gebruikt, voert u uw stream minstens één keer per 7 dagen uit
Wanneer u bestandsevenementen gebruikt, voert u uw Auto Loader-streams minstens één keer per 7 dagen uit om een volledige lijst met mappen te voorkomen. Wanneer u uw Auto Loader-streams zo vaak uitvoert, zorgt dit ervoor dat de detectie van bestanden incrementeel verloopt.