För bakgrundsjobb måste du ofta se till att endast en instans av en viss orkestrerare körs i taget, vilket förhindrar att flera orkestreringar körs samtidigt. Du kan implementera det här singleton-mönstret i Durable Functions eller Durable Task SDK:er genom att tilldela ett specifikt instans-ID till en orkestrerare när du skapar det och sedan kontrollera om en instans med det ID:t redan körs innan du startar en ny.
Den här artikeln visar hur du implementerar singleton orchestrators med kodexempel för varje språk som stöds.
Prerequisites
Anmärkning
Det finns en potentiell gängkonflikt i singleton-mönstret. Om två klienter kör check-and-start-logiken samtidigt kan båda anropen rapportera framgång, men bara en orkestreringsinstans startar faktiskt. Beroende på dina krav kan detta ha oönskade biverkningar. Om det krävs strikta garantier för en enskild instans kan du överväga att lägga till ytterligare låsningsmekanismer.
Viktigt!
PowerShell Durable Task SDK är för närvarande inte tillgängligt.
Singleton Orchestrator-exempel
I följande exempel visas en HTTP-utlösarfunktion som skapar en singleton-bakgrundsjobborkestrering. Koden försöker se till att endast en aktiv instans finns för ett angivet instans-ID.
[Function("HttpStartSingle")]
public static async Task<HttpResponseData> RunSingle(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "orchestrators/{functionName}/{instanceId}")] HttpRequestData req,
[DurableClient] DurableTaskClient starter,
string functionName,
string instanceId,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("HttpStartSingle");
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
OrchestrationMetadata? existingInstance = await starter.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
await starter.ScheduleNewOrchestrationInstanceAsync(functionName, requestBody, new StartOrchestrationOptions { InstanceId = instanceId });
logger.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return await starter.CreateCheckStatusResponseAsync(req, instanceId);
}
else
{
// An instance with the specified ID exists or an existing one still running, don't create one.
var response = req.CreateResponse(HttpStatusCode.Conflict);
await response.WriteStringAsync($"An instance with ID '{instanceId}' already exists.");
return response;
}
}
Anmärkning
Den tidigare C#-koden är för den isolerade arbetsmodellen, som är den rekommenderade modellen för .NET appar. Mer information om skillnaderna mellan de processbaserade och isolerade arbetsmodellerna finns i artikeln Durable Functions versioner.
Viktigt!
Det här JavaScript-exemplet använder Node.js programmeringsmodell v3, som använder function.json. Om du använder Node.js programmeringsmodell v4 (aktuell standard) använder du mönstret v4 code-first i stället.
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}/{instanceId}",
"methods": ["post"]
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
index.js
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instanceId = req.params.instanceId;
const functionName = req.params.functionName;
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
const existingInstance = await client.getStatus(instanceId);
if (!existingInstance
|| existingInstance.runtimeStatus == "Completed"
|| existingInstance.runtimeStatus == "Failed"
|| existingInstance.runtimeStatus == "Terminated") {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
const eventData = req.body;
await client.startNew(functionName, instanceId, eventData);
context.log(`Started orchestration with ID = '${instanceId}'.`);
return client.createCheckStatusResponse(req, instanceId);
} else {
// An instance with the specified ID exists or an existing one still running, don't create one.
return {
status: 409,
body: `An instance with ID '${instanceId}' already exists.`,
};
}
};
function_app.py
import logging
import azure.functions as func
import azure.durable_functions as df
app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION)
@app.route(route="orchestrators/{functionName}/{instanceId}", methods=["POST"])
@app.durable_client_input(client_name="client")
async def http_start_single(req: func.HttpRequest, client):
instance_id = req.route_params["instanceId"]
function_name = req.route_params["functionName"]
existing_instance = await client.get_status(instance_id)
if not existing_instance or existing_instance.runtime_status in [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Failed,
df.OrchestrationRuntimeStatus.Terminated,
]:
event_data = req.get_body()
instance_id = await client.start_new(function_name, instance_id, event_data)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
return func.HttpResponse(
body=f"An instance with ID '{instance_id}' already exists.",
status_code=409,
)
using namespace System.Net
param($Request, $TriggerMetadata)
$FunctionName = $Request.Params.FunctionName
$InstanceId = $Request.Params.InstanceId
# Check if an instance with the specified ID already exists
$existingInstance = Get-DurableStatus -InstanceId $InstanceId
if (-not $existingInstance -or
$existingInstance.RuntimeStatus -eq "Completed" -or
$existingInstance.RuntimeStatus -eq "Failed" -or
$existingInstance.RuntimeStatus -eq "Terminated") {
# An instance with the specified ID doesn't exist or stopped running, create one
$InstanceId = Start-DurableOrchestration -FunctionName $FunctionName -InstanceId $InstanceId -Input $Request.Body
Write-Host "Started orchestration with ID = '$InstanceId'."
$Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
}
else {
# An instance with the specified ID exists or still running
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::Conflict
Body = "An instance with ID '$InstanceId' already exists."
})
}
@FunctionName("HttpStartSingle")
public HttpResponseMessage runSingle(
@HttpTrigger(name = "req", route = "orchestrators/{functionName}/{instanceId}") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceId = req.getHeaders().getOrDefault("instanceId", "singleton-job");
DurableTaskClient client = durableContext.getClient();
// Check to see if an instance with this ID is already running
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, false);
if (metadata == null || !metadata.isRunning()) {
// No such instance exists or it finished - create a new one.
// De-dupe is handled automatically in the storage layer if another
// function tries to also use this instance ID.
client.scheduleNewOrchestrationInstance("MyOrchestration", null, instanceId);
return durableContext.createCheckStatusResponse(req, instanceId);
}
return req.createResponseBuilder(HttpStatus.CONFLICT)
.body("An instance with ID '" + instanceId + "' already exists.")
.build();
}
I följande exempel visas hur du skapar en singleton-orkestrering med hjälp av Durable Task SDK:er. Koden försöker se till att endast en aktiv instans finns för ett angivet instans-ID.
using Microsoft.DurableTask.Client;
// Check if an instance with the specified ID already exists
string instanceId = "singleton-job";
OrchestrationMetadata? existingInstance = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.ScheduleNewOrchestrationInstanceAsync("MyOrchestration", input, new StartOrchestrationOptions(instanceId));
Console.WriteLine($"Started orchestration with ID = '{instanceId}'.");
}
else
{
// An instance with the specified ID exists or an existing one still running.
Console.WriteLine($"An instance with ID '{instanceId}' already exists.");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
instance_id = "singleton-job"
# Check if an instance with the specified ID already exists
existing_instance = client.get_orchestration_state(instance_id)
if (existing_instance is None or
existing_instance.runtime_status in ['COMPLETED', 'FAILED', 'TERMINATED']):
# An instance with the specified ID doesn't exist or an existing one stopped running, create one.
client.schedule_new_orchestration(my_orchestration, input=input_data, instance_id=instance_id)
print(f"Started orchestration with ID = '{instance_id}'.")
else:
# An instance with the specified ID exists or an existing one still running.
print(f"An instance with ID '{instance_id}' already exists.")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
String instanceId = "singleton-job";
// Check to see if an instance with this ID is already running
OrchestrationMetadata existingInstance = client.getInstanceMetadata(instanceId, false);
if (existingInstance == null || !existingInstance.isRunning()) {
// An instance doesn't exist or finished - create one
client.scheduleNewOrchestrationInstance("MyOrchestration", input, instanceId);
System.out.println("Started orchestration with ID = '" + instanceId + "'.");
} else {
// An instance with the specified ID exists and is still running
System.out.println("An instance with ID '" + instanceId + "' already exists.");
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
const instanceId = "singleton-job";
// Check if an instance with the specified ID already exists
const existingInstance = await client.getOrchestrationState(instanceId, false);
if (!existingInstance ||
existingInstance.runtimeStatus === OrchestrationStatus.COMPLETED ||
existingInstance.runtimeStatus === OrchestrationStatus.FAILED ||
existingInstance.runtimeStatus === OrchestrationStatus.TERMINATED) {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.scheduleNewOrchestration("MyOrchestration", input, instanceId);
console.log(`Started orchestration with ID = '${instanceId}'.`);
} else {
// An instance with the specified ID exists or an existing one still running.
console.log(`An instance with ID '${instanceId}' already exists.`);
}
Durable Task SDK är inte tillgängligt för PowerShell. Använd Durable Functions i stället.
Så här fungerar singleton-mönstret
Eftersom ett instans-ID är unikt i en aktivitetshubb, förhindrar duplikatsamtidiga körningar genom att först schemalägga en orkestrering med ett känt, fast ID och verifiera dess status. Som förval är instans-ID:n slumpmässigt genererade GUID:er. I föregående exempel skickas dock ett specifikt instans-ID. Koden hämtar sedan metadata för orkestreringsinstansen för att kontrollera om en instans med det ID:t redan körs. Om ingen sådan instans körs skapas en ny instans med det ID:t.
Själva orkestreringsfunktionen kan använda valfritt mönster – en standardfunktion som startar och slutförs, eller en evig orkestrering som körs kontinuerligt. Singleton-mönstret styr bara hur många instanser som körs samtidigt.
Nästa steg