Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Esta página contiene recomendaciones para programar cargas de trabajo de Structured Streaming mediante trabajos en Azure Databricks.
Databricks recomienda configurar siempre lo siguiente:
- Quite el código innecesario de los cuadernos que devolverían resultados, como
displayycount. - No ejecute cargas de trabajo de Structured Streaming mediante proceso multiuso. Programe siempre los flujos como trabajos utilizando proceso de trabajos.
- Programar trabajos mediante el modo
. Esto hace referencia a la característica de programación de trabajos de Azure Databricks, no al intervalo structured Streaming trigger. - No habilite el escalado automático para cómputo en trabajos de Structured Streaming.
Algunas cargas de trabajo se benefician de lo siguiente:
- Configurar el almacén de estado de RocksDB en Azure Databricks
- Comprobación asíncrona del estado de las consultas con estado
- ¿Qué es el seguimiento de progreso asincrónico?
Azure Databricks ha introducido las Canalizaciones Declarativas de Spark de Lakeflow para reducir las complejidades de gestionar la infraestructura de producción en las cargas de trabajo de Structured Streaming. Databricks recomienda usar canalizaciones declarativas de Spark de Lakeflow para las nuevas canalizaciones de Structured Streaming. Consulte Canalizaciones declarativas de Spark de Lakeflow.
Nota
El escalado automático de proceso tiene limitaciones al reducir verticalmente el tamaño del clúster para cargas de trabajo de Structured Streaming. Databricks recomienda usar canalizaciones declarativas de Spark de Lakeflow con escalado automático mejorado para cargas de trabajo de streaming. Consulte Optimización del uso del clúster de canalizaciones declarativas de Spark de Lakeflow con escalado automático.
:::note Computación sin servidor
En la computación sin servidor, solo Trigger.AvailableNow() y Trigger.Once() se admiten. Databricks recomienda Trigger.AvailableNow().
Para la transmisión continua en informática sin servidor, use desencadenado frente al modo de canalización continua en modo continuo.
Consulte Limitaciones de streaming.
:::
Diseño de cargas de trabajo de streaming para esperar un error
Databricks recomienda configurar siempre los trabajos de streaming para reiniciarse automáticamente en caso de error. Algunas funcionalidades, incluida la evolución del esquema, requieren que las cargas de trabajo de Structured Streaming estén configuradas para volver a intentarlo automáticamente. Consulte Configuración de trabajos de Structured Streaming para que reinicien las consultas de streaming en caso de error.
Algunas operaciones como foreachBatch proporcionan al menos una vez en lugar de garantías exactamente una vez. Para estas operaciones, asegúrese de que la canalización de procesamiento sea idempotente. Consulte Uso de foreachBatch para escribir en receptores de datos arbitrarios.
Nota
Cuando se reinicia una consulta, se procesa el microlote planificado durante la ejecución anterior. Si su trabajo falló debido a un error de falta de memoria o canceló manualmente un trabajo debido a un microlote sobredimensionado, es posible que necesite escalar verticalmente el proceso informático para procesar correctamente el microlote.
Si cambia las configuraciones entre ejecuciones, estas configuraciones se aplican al primer lote nuevo planeado. Consulte Recuperación después de los cambios en una consulta de Structured Streaming.
¿Cuándo se reintenta un trabajo?
Puede programar varias tareas como parte de un trabajo de Azure Databricks. Al configurar un trabajo mediante el desencadenador continuo, no se pueden establecer dependencias entre tareas.
Puede optar por programar varias secuencias en un solo trabajo mediante uno de los siguientes métodos:
- Varias tareas: defina un trabajo con varias tareas que ejecutan cargas de trabajo de streaming mediante el desencadenador continuo.
- Varias consultas: defina varias consultas de streaming en el código fuente para una sola tarea.
También puede combinar estas estrategias. En la siguiente tabla se comparan estos enfoques.
| Estrategia | Varias tareas | Varias consultas |
|---|---|---|
| ¿Cómo se comparte el proceso? | Databricks recomienda implementar procesos de trabajo del tamaño adecuado para cada tarea de streaming. Opcionalmente, puede compartir el proceso entre tareas. | Todas las consultas comparten el mismo proceso. Puede opcionalmente asignar consultas a grupos de planificación. |
| ¿Cómo se controlan los reintentos? | Todas las tareas deben producir un error antes de que el trabajo se reintente. | La tarea vuelve a intentarlo si se produce un error en alguna consulta. |
Configurar trabajos de Structured Streaming para reiniciar las consultas de streaming en caso de error
Databricks recomienda configurar todas las cargas de trabajo de streaming mediante el desencadenador continuo. Consulte Ejecución de trabajos continuamente.
El desencadenador continuo tiene el siguiente comportamiento de forma predeterminada:
- Impide que se ejecute más de una ejecución simultánea del trabajo.
- Inicia una nueva ejecución cuando se produce un error en una ejecución anterior.
- Usa retroceso exponencial para reintentos.
Databricks recomienda usar siempre el proceso de trabajos en lugar del proceso multiuso al programar flujos de trabajo. En caso de error y reintento del trabajo, se implementan nuevos recursos de proceso.
Nota
Databricks recomienda no usar streamingQuery.awaitTermination() ni spark.streams.awaitAnyTermination(). Consulte Cuándo usar awaitTermination().
Cuándo usar awaitTermination()
streamingQuery.awaitTermination() y spark.streams.awaitAnyTermination() bloquean el hilo actual hasta que finalice una consulta en streaming. Si se usan estas funciones depende del entorno de ejecución.
En el caso de los trabajos de Databricks, no use streamingQuery.awaitTermination() ni spark.streams.awaitAnyTermination(). Estas funciones no son necesarias porque el servicio Trabajos impide que una ejecución se complete automáticamente cuando una consulta de streaming esté activa. Ambas funciones impiden que las celdas del cuaderno finalicen e impidan que el servicio Trabajos realice el seguimiento de la consulta de streaming, lo que interrumpe las métricas de trabajos pendientes y las notificaciones de trabajo.
Use awaitTermination() en los casos siguientes:
| Caso de uso | Comportamiento |
|---|---|
| Cuadernos interactivos en cómputo de propósito general |
awaitTermination() mantiene la celda en ejecución, permite observar el estado de la consulta y garantiza que los errores se muestran en la salida del cuaderno. |
| Entornos locales y de desarrollo | Cuando se ejecuta un programa spark localmente, el proceso se cierra cuando se completa el subproceso principal. Llame awaitTermination() para mantener el programa activo hasta que finalice o falle la consulta de transmisión. |
| Propagación de errores al controlador | Sin awaitTermination(), es posible que un error de consulta de streaming en un contexto que no sea de trabajo no se propague al subproceso que realiza la llamada. La consulta puede producir errores de forma silenciosa, lo que dificulta la detección y el diagnóstico de errores. Al llamar a awaitTermination(), se relanza la excepción de consulta en el controlador de dispositivo. |
Uso de grupos de programador para varias consultas de streaming
Puede configurar conjuntos de planificador para asignar capacidad de cómputo a consultas al ejecutar varias consultas de streaming desde el mismo código fuente.
De manera predeterminada, todas las consultas iniciadas en un cuaderno se ejecutan en el mismo grupo de programación justo. Los trabajos de Apache Spark generados por desencadenadores a partir de todas las consultas de streaming de un cuaderno se ejecutan uno tras otro en orden "primero en entrar, primero en salir" (FIFO). Esto puede provocar retrasos innecesarios en las consultas, ya que no comparten eficazmente los recursos del clúster.
Los grupos de programadores permiten declarar qué consultas de Structured Streaming comparten recursos de proceso.
En el ejemplo siguiente, se asigna query1 a un grupo dedicado, mientras que query2 y query3 comparten un grupo de planificación.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Nota
La configuración de la propiedad local debe estar en la misma celda del cuaderno en que se inicia la consulta de streaming.
Para obtener más información sobre los grupos de Apache Fair Scheduler, consulte la documentación de Apache Fair Scheduler.