Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Importante
A update_flow API está em Visualização Pública.
Use o @dp.update_flow decorador para criar um fluxo de atualizações. Os fluxos de atualização escrevem para os sinks usando o modo de saída de atualização, emitindo apenas as linhas que mudaram em cada lote. Ao contrário dos fluxos anexos, suportam agregações com estado sem necessidade de marca de água.
Os fluxos de atualização só podem atingir os sumidouros. As tabelas Delta não são suportadas.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parâmetro | Tipo | Description |
|---|---|---|
| função | function |
Required. Uma função que retorna um DataFrame de streaming Apache Spark de uma consulta definida pelo usuário. |
target |
str |
Required. O nome do sumidouro onde este fluxo escreve. |
name |
str |
O nome do fluxo. Se não for fornecido, o padrão será o nome da função. |
comment |
str |
Uma descrição para o fluxo. |
spark_conf |
dict |
Um ditado das configurações do Spark para a execução desta consulta. Estas configurações sobrepõem as conferências definidas para o destino, pipeline ou cluster. |
import_checkpoint |
str |
Um caminho de checkpoint externo para importar antes de iniciar o fluxo. Importado apenas uma vez, quando o diretório de checkpoint do fluxo ainda não existe. |
Exemplos
Agregação a um sumidouro de Kafka
Escreva resultados de agregação com estado num sumidouro de Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Modo em tempo real
Use spark_conf para configurar um fluxo de atualização para o modo em tempo real:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitações
- Os sumidouros de tabela Delta não são suportados como alvos para fluxos de atualização.