Muistiinpano
Tämän sivun käyttö edellyttää valtuutusta. Voit yrittää kirjautua sisään tai vaihtaa hakemistoa.
Tämän sivun käyttö edellyttää valtuutusta. Voit yrittää vaihtaa hakemistoa.
This page contains recommendations for scheduling Structured Streaming workloads using jobs on Azure Databricks.
Databricks recommends that you always configure the following:
- Remove unnecessary code from notebooks that would return results, such as
displayandcount. - Do not run Structured Streaming workloads using all-purpose compute. Always schedule streams as jobs using jobs compute.
- Schedule jobs using
Continuousmode. This refers to the Azure Databricks Jobs scheduling feature, not the Structured Streaming trigger interval. - Do not enable autoscaling for compute for Structured Streaming jobs.
Some workloads benefit from the following:
- Configure RocksDB state store on Azure Databricks
- Asynchronous state checkpointing for stateful queries
- What is asynchronous progress tracking?
Azure Databricks has introduced Lakeflow Spark Declarative Pipelines to reduce the complexities of managing production infrastructure for Structured Streaming workloads. Databricks recommends using Lakeflow Spark Declarative Pipelines for new Structured Streaming pipelines. See Lakeflow Spark Declarative Pipelines.
Note
Compute auto-scaling has limitations scaling down cluster size for Structured Streaming workloads. Databricks recommends using Lakeflow Spark Declarative Pipelines with enhanced autoscaling for streaming workloads. See Optimize the cluster utilization of Lakeflow Spark Declarative Pipelines with Autoscaling.
:::note Serverless compute
On serverless compute, only Trigger.AvailableNow() and Trigger.Once() are supported. Databricks recommends Trigger.AvailableNow().
For continuous streaming on serverless compute, use Triggered vs. continuous pipeline mode in continuous mode.
:::
Design streaming workloads to expect failure
Databricks recommends always configuring streaming jobs to automatically restart on failure. Some capabilities, including schema evolution, require that Structured Streaming workloads are configured to retry automatically. See Configure Structured Streaming jobs to restart streaming queries on failure.
Some operations like foreachBatch provide at-least-once rather than exactly-once guarantees. For these operations, make sure that your processing pipeline is idempotent. See Use foreachBatch to write to arbitrary data sinks.
Note
When a query restarts, the micro-batch planned during the previous run processes. If your job failed due to an out-of-memory error or you manually canceled a job due to an oversized micro-batch, you might need to scale up the compute to successfully process the micro-batch.
If you change configurations between runs, these configurations apply to the first new batch planned. See Recover after changes in a Structured Streaming query.
When does a job retry?
You can schedule multiple tasks as part of a Azure Databricks job. When you configure a job using the continuous trigger, you cannot set dependencies between tasks.
You might choose to schedule multiple streams in a single job using one of the following approaches:
- Multiple tasks: Define a job with multiple tasks that run streaming workloads using the continuous trigger.
- Multiple queries: Define multiple streaming queries in the source code for a single task.
You can also combine these strategies. The following table compares these approaches.
| Strategy | Multiple tasks | Multiple queries |
|---|---|---|
| How is compute shared? | Databricks recommends deploying jobs compute appropriately sized to each streaming task. You can optionally share compute across tasks. | All queries share the same compute. You can optionally assign queries to scheduler pools. |
| How are retries handled? | All tasks must fail before the job retries. | The task retries if any query fails. |
Configure Structured Streaming jobs to restart streaming queries on failure
Databricks recommends configuring all streaming workloads using the continuous trigger. See Run jobs continuously.
The continuous trigger has the following behavior by default:
- Prevents more than one concurrent run of the job.
- Starts a new run when a previous run fails.
- Uses exponential backoff for retries.
Databricks recommends always using jobs compute instead of all-purpose compute when scheduling workflows. On job failure and retry, new compute resources deploy.
Note
Databricks recommends that you do not use streamingQuery.awaitTermination() or spark.streams.awaitAnyTermination(). See When to use awaitTermination().
When to use awaitTermination()
streamingQuery.awaitTermination() and spark.streams.awaitAnyTermination() block the current thread until a streaming query terminates. Whether to use these functions depends on your execution environment.
For Databricks Jobs, do not use streamingQuery.awaitTermination() or spark.streams.awaitAnyTermination(). These functions are not necessary because the Jobs service automatically prevents a run from completing when a streaming query is active. Both functions block notebook cells from completing and prevent the Jobs service from tracking the streaming query, which disrupts backlog metrics and job notifications.
Use awaitTermination() in the following cases:
| Use case | Behavior |
|---|---|
| Interactive notebooks on all-purpose compute | awaitTermination() keeps the cell running, allows you to observe query state, and ensures that failures surface in the notebook output. |
| Local and development environments | When running a Spark program locally, the process exits when the main thread completes. Call awaitTermination() to keep the program alive until the streaming query finishes or fails. |
| Failure propagation to the driver | Without awaitTermination(), a streaming query failure in a non-job context may not propagate to the calling thread. The query can fail silently, making failures harder to detect and diagnose. Calling awaitTermination() re-throws the query exception on the driver. |
Use scheduler pools for multiple streaming queries
You can configure scheduler pools to assign compute capacity to queries when running multiple streaming queries from the same source code.
By default, all queries started in a notebook run in the same fair scheduling pool. Apache Spark jobs generated by triggers from all of the streaming queries in a notebook run one after another in “first in, first out” (FIFO) order. This can cause unnecessary delays in the queries, because they are not efficiently sharing the cluster resources.
Scheduler pools allow you to declare which Structured Streaming queries share compute resources.
The following example assigns query1 to a dedicated pool, whereas query2 and query3 share a scheduler pool.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Note
The local property configuration must be in the same notebook cell where you start your streaming query.
For more information about Apache fair scheduler pools, see Apache fair scheduler documentation.