Voor achtergrondtaken moet u er vaak voor zorgen dat slechts één exemplaar van een bepaalde Orchestrator tegelijk wordt uitgevoerd, waardoor dubbele orkestraties niet gelijktijdig kunnen plaatsvinden. U kunt dit singleton-patroon implementeren in Durable Functions of de Urable Task SDK's door een specifieke exemplaar-id toe te wijzen aan een orchestrator bij het maken en vervolgens te controleren of een exemplaar met die id al wordt uitgevoerd voordat een nieuwe wordt gestart.
In dit artikel wordt beschreven hoe u singleton-orchestrators implementeert met codevoorbeelden voor elke ondersteunde taal.
Prerequisites
Opmerking
Er is een potentiële racevoorwaarde in het singleton-patroon. Als twee clients gelijktijdig de controle- en startlogica uitvoeren, kunnen beide aanroepen een succes melden, maar slechts één orchestrationexemplaar wordt daadwerkelijk gestart. Afhankelijk van uw vereisten kan dit ongewenste bijwerkingen hebben. Als strikte garanties voor één exemplaar vereist zijn, kunt u extra vergrendelingsmechanismen toevoegen.
Belangrijk
Momenteel is de Durable Task SDK van PowerShell niet beschikbaar.
Voorbeeld van Singleton orchestrator
In het volgende voorbeeld ziet u een HTTP-triggerfunctie waarmee een singleton-achtergrondtaakindeling wordt gemaakt. De code probeert ervoor te zorgen dat er slechts één actief exemplaar bestaat voor een opgegeven exemplaar-id.
[Function("HttpStartSingle")]
public static async Task<HttpResponseData> RunSingle(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "orchestrators/{functionName}/{instanceId}")] HttpRequestData req,
[DurableClient] DurableTaskClient starter,
string functionName,
string instanceId,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("HttpStartSingle");
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
OrchestrationMetadata? existingInstance = await starter.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed
|| existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
await starter.ScheduleNewOrchestrationInstanceAsync(functionName, requestBody, new StartOrchestrationOptions { InstanceId = instanceId });
logger.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return await starter.CreateCheckStatusResponseAsync(req, instanceId);
}
else
{
// An instance with the specified ID exists or an existing one still running, don't create one.
var response = req.CreateResponse(HttpStatusCode.Conflict);
await response.WriteStringAsync($"An instance with ID '{instanceId}' already exists.");
return response;
}
}
Opmerking
De vorige C#-code is voor het geïsoleerde werkrolmodel. Dit is het aanbevolen model voor .NET-apps. Zie het artikel Durable Functions versies voor meer informatie over de verschillen tussen de in-proces- en geïsoleerde werkrolmodellen.
Belangrijk
In dit JavaScript-voorbeeld wordt het Node.js programmeermodel v3 gebruikt, dat gebruikmaakt vanfunction.json. Als u het Node.js programmeermodel v4 (de huidige standaardinstelling) gebruikt, gebruikt u in plaats daarvan het v4-code-first-patroon.
function.json
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}/{instanceId}",
"methods": ["post"]
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
]
}
index.js
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instanceId = req.params.instanceId;
const functionName = req.params.functionName;
// Check if an instance with the specified ID already exists or an existing one stopped running(completed/failed/terminated).
const existingInstance = await client.getStatus(instanceId);
if (!existingInstance
|| existingInstance.runtimeStatus == "Completed"
|| existingInstance.runtimeStatus == "Failed"
|| existingInstance.runtimeStatus == "Terminated") {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
const eventData = req.body;
await client.startNew(functionName, instanceId, eventData);
context.log(`Started orchestration with ID = '${instanceId}'.`);
return client.createCheckStatusResponse(req, instanceId);
} else {
// An instance with the specified ID exists or an existing one still running, don't create one.
return {
status: 409,
body: `An instance with ID '${instanceId}' already exists.`,
};
}
};
function_app.py
import logging
import azure.functions as func
import azure.durable_functions as df
app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION)
@app.route(route="orchestrators/{functionName}/{instanceId}", methods=["POST"])
@app.durable_client_input(client_name="client")
async def http_start_single(req: func.HttpRequest, client):
instance_id = req.route_params["instanceId"]
function_name = req.route_params["functionName"]
existing_instance = await client.get_status(instance_id)
if not existing_instance or existing_instance.runtime_status in [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Failed,
df.OrchestrationRuntimeStatus.Terminated,
]:
event_data = req.get_body()
instance_id = await client.start_new(function_name, instance_id, event_data)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
return func.HttpResponse(
body=f"An instance with ID '{instance_id}' already exists.",
status_code=409,
)
using namespace System.Net
param($Request, $TriggerMetadata)
$FunctionName = $Request.Params.FunctionName
$InstanceId = $Request.Params.InstanceId
# Check if an instance with the specified ID already exists
$existingInstance = Get-DurableStatus -InstanceId $InstanceId
if (-not $existingInstance -or
$existingInstance.RuntimeStatus -eq "Completed" -or
$existingInstance.RuntimeStatus -eq "Failed" -or
$existingInstance.RuntimeStatus -eq "Terminated") {
# An instance with the specified ID doesn't exist or stopped running, create one
$InstanceId = Start-DurableOrchestration -FunctionName $FunctionName -InstanceId $InstanceId -Input $Request.Body
Write-Host "Started orchestration with ID = '$InstanceId'."
$Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
}
else {
# An instance with the specified ID exists or still running
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::Conflict
Body = "An instance with ID '$InstanceId' already exists."
})
}
@FunctionName("HttpStartSingle")
public HttpResponseMessage runSingle(
@HttpTrigger(name = "req", route = "orchestrators/{functionName}/{instanceId}") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceId = req.getHeaders().getOrDefault("instanceId", "singleton-job");
DurableTaskClient client = durableContext.getClient();
// Check to see if an instance with this ID is already running
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, false);
if (metadata == null || !metadata.isRunning()) {
// No such instance exists or it finished - create a new one.
// De-dupe is handled automatically in the storage layer if another
// function tries to also use this instance ID.
client.scheduleNewOrchestrationInstance("MyOrchestration", null, instanceId);
return durableContext.createCheckStatusResponse(req, instanceId);
}
return req.createResponseBuilder(HttpStatus.CONFLICT)
.body("An instance with ID '" + instanceId + "' already exists.")
.build();
}
In het volgende voorbeeld ziet u hoe u een singleton-orchestratie maakt met behulp van de Durable Task SDK's. De code probeert ervoor te zorgen dat er slechts één actief exemplaar bestaat voor een opgegeven exemplaar-id.
using Microsoft.DurableTask.Client;
// Check if an instance with the specified ID already exists
string instanceId = "singleton-job";
OrchestrationMetadata? existingInstance = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: false);
if (existingInstance == null ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Failed ||
existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Terminated)
{
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.ScheduleNewOrchestrationInstanceAsync("MyOrchestration", input, new StartOrchestrationOptions(instanceId));
Console.WriteLine($"Started orchestration with ID = '{instanceId}'.");
}
else
{
// An instance with the specified ID exists or an existing one still running.
Console.WriteLine($"An instance with ID '{instanceId}' already exists.");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
instance_id = "singleton-job"
# Check if an instance with the specified ID already exists
existing_instance = client.get_orchestration_state(instance_id)
if (existing_instance is None or
existing_instance.runtime_status in ['COMPLETED', 'FAILED', 'TERMINATED']):
# An instance with the specified ID doesn't exist or an existing one stopped running, create one.
client.schedule_new_orchestration(my_orchestration, input=input_data, instance_id=instance_id)
print(f"Started orchestration with ID = '{instance_id}'.")
else:
# An instance with the specified ID exists or an existing one still running.
print(f"An instance with ID '{instance_id}' already exists.")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
String instanceId = "singleton-job";
// Check to see if an instance with this ID is already running
OrchestrationMetadata existingInstance = client.getInstanceMetadata(instanceId, false);
if (existingInstance == null || !existingInstance.isRunning()) {
// An instance doesn't exist or finished - create one
client.scheduleNewOrchestrationInstance("MyOrchestration", input, instanceId);
System.out.println("Started orchestration with ID = '" + instanceId + "'.");
} else {
// An instance with the specified ID exists and is still running
System.out.println("An instance with ID '" + instanceId + "' already exists.");
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
const instanceId = "singleton-job";
// Check if an instance with the specified ID already exists
const existingInstance = await client.getOrchestrationState(instanceId, false);
if (!existingInstance ||
existingInstance.runtimeStatus === OrchestrationStatus.COMPLETED ||
existingInstance.runtimeStatus === OrchestrationStatus.FAILED ||
existingInstance.runtimeStatus === OrchestrationStatus.TERMINATED) {
// An instance with the specified ID doesn't exist or an existing one stopped running, create one.
await client.scheduleNewOrchestration("MyOrchestration", input, instanceId);
console.log(`Started orchestration with ID = '${instanceId}'.`);
} else {
// An instance with the specified ID exists or an existing one still running.
console.log(`An instance with ID '${instanceId}' already exists.`);
}
De Durable Task SDK is niet beschikbaar voor PowerShell. Gebruik in plaats daarvan Durable Functions.
Hoe het singleton-patroon werkt
Omdat exemplaar-id's uniek zijn binnen een taakhub, zorgt het plannen van een orchestratie met een bekende, vaste id en eerst de status controleren ervoor dat dubbele gelijktijdige uitvoeringen worden voorkomen. Instantie-id's worden standaard willekeurig gegenereerde GUID's. In de vorige voorbeelden wordt echter een specifieke exemplaar-id doorgegeven. De code haalt vervolgens de metagegevens van het orkestratie-exemplaar op om te controleren of een exemplaar met die ID al wordt uitgevoerd. Als er geen dergelijke instantie wordt uitgevoerd, wordt er een nieuwe instantie met die ID gemaakt.
De orchestratorfunctie zelf kan elk patroon gebruiken: een standaardfunctie die wordt gestart en voltooid, of een eeuwige indeling die continu wordt uitgevoerd. Het singleton-patroon bepaalt alleen hoeveel exemplaren gelijktijdig worden uitgevoerd.
Volgende stappen