Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Define a saída da consulta de streaming a ser processada usando o gravador fornecido. A lógica de processamento pode ser especificada como uma função que usa uma linha como entrada ou como um objeto com process(row) métodos e open(partition_id, epoch_id) opcionaisclose(error).
Sintaxe
foreach(f)
Parâmetros
| Parâmetro | Tipo | Descrição |
|---|---|---|
f |
callable ou object | Uma função que usa uma linha como entrada ou um objeto com um process(row) método e métodos e open opcionaisclose. |
Devoluções
DataStreamWriter
Observações
O objeto fornecido deve ser serializável. Qualquer inicialização para gravar dados (por exemplo, abrir uma conexão) deve ser feita no interior open(), não no momento da construção.
Exemplos
import time
df = spark.readStream.format("rate").load()
Processar cada linha usando uma função:
def print_row(row):
print(row)
q = df.writeStream.foreach(print_row).start()
time.sleep(3)
q.stop()
Processe cada linha usando um objeto com open, processe close métodos:
class RowPrinter:
def open(self, partition_id, epoch_id):
print("Opened %d, %d" % (partition_id, epoch_id))
return True
def process(self, row):
print(row)
def close(self, error):
print("Closed with error: %s" % str(error))
q = df.writeStream.foreach(RowPrinter()).start()
time.sleep(3)
q.stop()