Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Du kan läsa in data från alla datakällor som stöds av Apache Spark på Azure Databricks med hjälp av pipelines. Du kan definiera datauppsättningar – tabeller och vyer – i Lakeflow Spark deklarativa pipelines för alla frågor som returnerar en Spark DataFrame, inklusive strömmande DataFrames och Pandas på Spark DataFrames. För datainmatningsuppgifter rekommenderar Databricks att du använder strömningstabeller för de flesta användningsfall. Strömmande tabeller är användbara för att mata in data från molnobjektlagring med hjälp av Auto Loader eller från meddelandebussar som Kafka.
Alla datakällor har inte SQL-stöd för inmatning. Du kan dock blanda SQL- och Python-källor i samma pipeline för att använda Python där det behövs. Mer information om hur du arbetar med bibliotek som inte är paketerade i Lakeflow Spark Deklarativa pipelines som standard finns i Hantera Python-beroenden för pipelines. Allmän information om datainmatning i Azure Databricks finns i Standardanslutningar i Lakeflow Connect.
I följande exempel visas några vanliga datainläsningsmönster.
Ladda från en befintlig tabell
Läs in data från en befintlig tabell i Azure Databricks. Du kan transformera data med hjälp av en fråga eller läsa in tabellen för vidare bearbetning i din pipeline.
python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
SQL
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Läsa in filer från molnobjektlagring
Databricks rekommenderar att du använder Auto Loader i pipelines för de flesta datainmatningsuppgifter från objektlagring i molnet eller från filer i en Unity Catalog volym. Automatisk inläsning och pipelines är utformade för att inkrementellt och idempotent läsa in ständigt växande data när de kommer till molnlagringen. Se Vad är automatisk inläsning? och Läsa in data från objektlagring.
I följande exempel läses data från molnlagring med Auto Loader.
python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
)
SQL
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
I följande exempel används Auto Loader för att skapa datamängder från CSV-filer på en Unity Catalog-volym.
python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
Anmärkning
- Om du använder Auto Loader med filaviseringar och kör en fullständig uppdatering för din pipeline eller strömningstabell måste du rensa dina resurser manuellt. Du kan använda CloudFilesResourceManager i en anteckningsbok för att utföra rensning.
- Om du vill läsa in filer med Auto Loader i en Unity Catalog-aktiverad pipeline måste du använda externa platser. Mer information om hur du använder Unity Catalog med pipelines finns i Använda Unity Catalog med pipelines.
Autentisera till molnlagring
Auto Loader använder externa lagringsplatser i Unity Catalog för att autentisera mot molnlagring. Du måste konfigurera en extern plats för den lagringssökväg som du vill läsa från och ge behörigheten READ FILES till den körbara användaren.
Om du vill mata in från Azure Data Lake Storage konfigurerar du en extern plats som backas upp av en lagringsautentiseringsuppgift som refererar till en lagringscontainer. Mer information finns i Ansluta till molnobjektlagring med Unity Catalog.
Läsa in data från en meddelandebuss
Du kan konfigurera pipelines för att mata in data från meddelandebussar. Databricks rekommenderar att du använder strömningstabeller med kontinuerlig exekvering och förbättrad autoskalning för att ge den optimala inmatningen för låg latens inläsning från meddelandebussar. För mer information, se Optimera klusteranvändningen av Lakeflow Spark Deklarativa Pipelines med Autoskalning.
Följande kod konfigurerar till exempel en strömmande tabell för att mata in data från Kafka med hjälp av funktionen read_kafka .
python
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
Information om hur du matar in från andra meddelandebusskällor finns i:
- Kinesis: read_kinesis
- Pub-/underämne: read_pubsub
- Pulsar: read_pulsar
Ladda data från Azure Event Hubs
Azure Event Hubs är en dataströmningstjänst som tillhandahåller ett Apache Kafka-kompatibelt gränssnitt. Du kan använda Structured Streaming Kafka-kopplingen, som ingår i Lakeflow Sparks deklarativa pipelines-körning, för att läsa in meddelanden från Azure Event Hubs. Mer information om hur du läser in och bearbetar meddelanden från Azure Event Hubs finns i Använda Azure Event Hubs som en pipelinedatakälla.
Läsa in data från externa system
Deklarativa Lakeflow Spark-pipelines stödjer datainläsning från alla datakällor som stöds av Azure Databricks. Se Ansluta till datakällor och externa tjänster. Du kan också ladda in externa data med Lakehouse Federation för stödda datakällor . Eftersom Lakehouse Federation kräver Databricks Runtime 13.3 LTS eller senare måste du för att använda Lakehouse Federation konfigurera din pipeline för att använda förhandsgranskningskanalen.
Vissa datakällor har inte motsvarande SQL-stöd. Om du inte kan använda Lakehouse Federation med någon av dessa datakällor kan du använda Python för att mata in data från källan. Du kan lägga till Python- och SQL-källfiler i samma pipeline. I följande exempel deklareras en materialiserad vy för att få åtkomst till det aktuella tillståndet för data i en fjärransluten PostgreSQL-tabell.
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Läs in små eller statiska datamängder från molnobjektlagring
Du kan läsa in små eller statiska datauppsättningar med apache Spark-inläsningssyntax. Lakeflow Spark deklarativa pipelines stöder alla filformat som stöds av Apache Spark på Azure Databricks. En fullständig lista finns i Alternativ för dataformat.
I följande exempel visas hur du läser in JSON för att skapa en tabell.
python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
Anmärkning
Funktionen read_files SQL är gemensam för alla SQL-miljöer i Azure Databricks. Det är det rekommenderade mönstret för direkt filåtkomst med SQL i pipelines. Mer information finns i Alternativ.
Läsa in data från en anpassad Python-datakälla
Med anpassade Python-datakällor kan du läsa in data i anpassade format. Du kan skriva kod för att läsa från och skriva till en specifik extern datakälla eller använda din befintliga Python kod för att läsa data från dina egna interna system. Mer information om hur du utvecklar Python-datakällor finns i PySpark-anpassade datakällor.
I följande exempel registreras en anpassad datakälla med formatnamnet my_custom_datasource och läser från den i både batch- och strömningslägen.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
Konfigurera en strömmande tabell för att ignorera ändringar i en källströmningstabell
Som standard kräver strömmande tabeller källor som endast tillåter tillägg. Om din källströmningstabell kräver uppdateringar eller borttagningar — till exempel för GDPR-ändamålen "rätten att bli bortglömd" — använd skipChangeCommits flaggan för att ignorera dessa ändringar. Den här flaggan fungerar bara med spark.readStream funktionen option() och kan inte användas när källuppspelningstabellen är målet för en create_auto_cdc_flow() funktion. Mer information finns i Hantera ändringar i Delta-källtabeller.
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Kom åt lagringsuppgifter på ett säkert sätt med säkerhetsnycklar i en pipeline
Du kan använda Azure Databricks-hemligheter för att lagra autentiseringsuppgifter som åtkomstnycklar eller lösenord. Om du vill konfigurera hemligheten i din pipeline använder du en Spark-egenskap i klusterkonfigurationen för pipelineinställningar. För pipelines, se Konfigurera klassisk beräkning.
I följande exempel används en hemlighet för att lagra en åtkomstnyckel som krävs för att läsa indata från ett Azure Data Lake Storage lagringskonto med automatisk inläsning. Du kan använda samma metod för att konfigurera alla hemligheter som krävs av din pipeline, till exempel AWS-nycklar för att komma åt S3 eller lösenordet till ett Apache Hive-metaarkiv.
Mer information om hur du arbetar med Azure Data Lake Storage finns i Ansluta till Azure Data Lake Storage och Blob Storage.
Anmärkning
Du måste lägga till prefixet spark.hadoop. till spark_conf konfigurationsnyckeln som anger det hemliga värdet.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
Ersätt följande värden i det här kodexemplet.
| Platshållare | Ersätt med |
|---|---|
<container-name> |
Namnet på containern för Azure-lagringskontot. |
<storage-account-name> |
Namnet på ADLS-lagringskontot. |
<path> |
Sökvägen för pipelinens utdata och metadata. |
<scope-name> |
Namnet på Azure Databricks-hemlighetsomfånget. |
<secret-name> |
Namnet på nyckeln som innehåller åtkomstnyckeln för Azure lagringskonto. |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Ersätt följande värden i det här kodexemplet.
| Platshållare | Ersätt med |
|---|---|
<container-name> |
Namnet på containern Azure lagringskonto som lagrar indata. |
<storage-account-name> |
Namnet på ADLS-lagringskontot. |
<path-to-input-dataset> |
Sökvägen till indatauppsättningen. |