Padrão fan-out/fan-in

Use o padrão fan-out/fan-in para executar várias funções em paralelo e, em seguida, agregar os resultados. Esse padrão é uma abordagem comum para processamento paralelo em fluxos de trabalho sem servidor do Azure. Neste tutorial, você implementará o padrão fan-out/fan-in com Durable Functions para fazer backup do conteúdo do site de um aplicativo para o Armazenamento do Azure.

Pré-requisitos

Utilize o padrão fan-out/fan-in para processamento paralelo na orquestração de fluxos de trabalho.

  • Distribua o trabalho entre várias atividades em execução simultaneamente.
  • Realizar fan-in, agregando os resultados.

Neste tutorial, você implementa o padrão fan-out/fan-in com os SDKs de Tarefa Durável para .NET, JavaScript, Python e Java.

Visão geral do cenário

Este exemplo demonstra o processamento paralelo carregando todos os arquivos em um diretório (recursivamente) no Armazenamento de Blobs do Azure e contando o total de bytes carregados.

Uma única função pode lidar com o upload, mas não é dimensionada. Uma execução de função é executada em uma VM (máquina virtual), portanto, a taxa de transferência é limitada a essa VM. A confiabilidade é outra preocupação: se o processo falhar no meio ou levar mais de cinco minutos, o backup terminará em um estado parcialmente concluído e deverá ser reiniciado.

Uma abordagem baseada em fila com duas funções melhora a taxa de transferência e a confiabilidade, mas introduz complexidade para gerenciamento e coordenação de estado, como relatar o total de bytes carregados.

O Durable Functions oferece processamento paralelo, confiabilidade e coordenação com sobrecarga mínima, sem necessidade de gerenciamento de fila.

Neste exemplo, um orquestrador de fluxo de trabalho distribui o trabalho entre várias atividades para processamento paralelo, e em seguida, agrega os resultados. Utilize o padrão fan-out/fan-in quando precisar:

  • Processar um lote de itens em que cada item pode ser tratado de forma independente
  • Distribuir o trabalho em vários computadores para obter uma melhor taxa de transferência
  • Agregar resultados de todas as operações paralelas

Sem esse padrão, você processa itens sequencialmente (limitando a taxa de transferência) ou cria sua própria lógica de enfileiramento e coordenação (adicionando complexidade). Os SDKs de Tarefa Durável lidam com a paralelização e a coordenação para você, tornando o padrão fan-out/fan-in fácil de implementar.

Componentes de função

Este artigo descreve as funções no aplicativo de exemplo:

  • E2_BackupSiteContent: uma função de orquestrador que chama E2_GetFileList para obter uma lista de arquivos para fazer backup e, em seguida, chama E2_CopyFileToBlob para cada arquivo.
  • E2_GetFileList: uma função de atividade que retorna uma lista de arquivos em um diretório.
  • E2_CopyFileToBlob: uma função de atividade que faz backup de um único arquivo para Azure Blob Storage.

Este artigo descreve os componentes no código de exemplo:

  • ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator, ou FanOutFanIn_WordCount: um orquestrador que distribui o trabalho para várias atividades em paralelo, aguarda a conclusão de todas as atividades e então agrega os resultados.
  • ProcessWorkItemActivity, processWorkItemou process_work_itemCountWords: uma atividade que processa um único item de trabalho.
  • AggregateResultsActivity, aggregateResultsou aggregate_results: uma atividade que agrega resultados de todas as operações paralelas.

Orquestrador

Essa função de orquestrador faz as seguintes tarefas:

  1. Usa rootDirectory como entrada.
  2. Chama uma função para obter uma lista recursiva de arquivos em rootDirectory.
  3. Faz chamadas de função paralelas para carregar cada arquivo para Azure Blob Storage.
  4. Aguarda que todos os uploads sejam concluídos.
  5. Retorna o número total de bytes carregados para Azure Blob Storage.

O código a seguir demonstra a implementação da função de orquestrador:

Modelo isolado
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_BackupSiteContent")]
    public static async Task<long> Run(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        string rootDirectory = context.GetInput<string>()?.Trim();
        if (string.IsNullOrEmpty(rootDirectory))
        {
            rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location)!.FullName;
        }

        string[] files = await context.CallActivityAsync<string[]>("E2_GetFileList", rootDirectory);

        Task<long>[] tasks = files
            .Select(file => context.CallActivityAsync<long>("E2_CopyFileToBlob", file))
            .ToArray();

        long[] results = await Task.WhenAll(tasks);
        return results.Sum();
    }
}

Observe a linha await Task.WhenAll(tasks);. O código não aguarda as chamadas individuais a E2_CopyFileToBlob, portanto, elas são executadas em paralelo. Quando o orquestrador passa a matriz de tarefas para Task.WhenAll, ele retorna uma tarefa que só é concluída quando todas as operações de cópia são concluídas. Se você estiver familiarizado com a TPL (Biblioteca Paralela de Tarefas) em .NET, esse padrão será familiar. Com a extensão Durable Functions, essas tarefas são executadas em várias máquinas virtuais simultaneamente e a execução de ponta a ponta é resiliente à reciclagem de processos.

Após o orquestrador aguardar Task.WhenAll, todas as chamadas de função são concluídas e retornam valores. Cada chamada para E2_CopyFileToBlob retorna o número de bytes enviados. Calcule o total adicionando os valores retornados.


Modelo em processo
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Observação

O exemplo de modelo em processo utiliza pacotes obsoletos em processo. O código anterior mostra o modelo de trabalho isolado do .NET recomendado.


O orquestrador realiza as seguintes tarefas:

  1. Usa uma lista de itens de trabalho como entrada.
  2. Realiza fan-out, criando uma tarefa para cada item de trabalho e processando-os em paralelo.
  3. Aguarda a conclusão de todas as tarefas paralelas.
  4. Realiza fan-in, agregando os resultados.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

Use Task.WhenAll() para aguardar a conclusão de todas as tarefas paralelas. O SDK de Tarefas Duráveis garante que as tarefas possam ser executadas em vários computadores simultaneamente e que a execução seja resiliente a reinicializações de processos.

Atividades

As funções de atividade auxiliar são funções regulares usando a activityTrigger vinculação.

Função de atividade de E2_GetFileList

Modelo isolado
using System.IO;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_GetFileList")]
    public static string[] GetFileList(
        [ActivityTrigger] string rootDirectory,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger("E2_GetFileList");
        logger.LogInformation("Searching for files under '{RootDirectory}'...", rootDirectory);

        string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
        logger.LogInformation("Found {FileCount} file(s) under {RootDirectory}.", files.Length, rootDirectory);

        return files;
    }
}

Modelo em processo
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Observação

Não coloque esse código na função de orquestrador. As funções de orquestrador não devem fazer E/S, incluindo o acesso ao sistema de arquivos local. Para obter mais informações, confira Restrições de código na função de orquestrador.

Função de atividade de E2_CopyFileToBlob

Modelo isolado

Observação

Para executar o código de exemplo, instale o pacote NuGet Azure.Storage.Blobs.

using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_CopyFileToBlob")]
    public static async Task<long> CopyFileToBlob(
        [ActivityTrigger] string filePath,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger("E2_CopyFileToBlob");
        long byteCount = new FileInfo(filePath).Length;

        string blobPath = filePath
            .Substring(Path.GetPathRoot(filePath)!.Length)
            .Replace('\\', '/');
        string outputLocation = $"backups/{blobPath}";

        string? connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
        if (string.IsNullOrEmpty(connectionString))
        {
            throw new InvalidOperationException("AzureWebJobsStorage is not configured.");
        }

        BlobContainerClient containerClient = new(connectionString, "backups");
        await containerClient.CreateIfNotExistsAsync();
        BlobClient blobClient = containerClient.GetBlobClient(blobPath);

        logger.LogInformation("Copying '{FilePath}' to '{OutputLocation}'. Total bytes = {ByteCount}.", filePath, outputLocation, byteCount);

        await using Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
        await blobClient.UploadAsync(source, overwrite: true);

        return byteCount;
    }
}

Modelo em processo
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Observação

O exemplo de modelo em processo requer o pacote NuGet Microsoft.Azure.WebJobs.Extensions.Storage e usa recursos de associação do Azure Functions como o Binder parâmetro.


A implementação carrega o arquivo do disco e transmite de forma assíncrona o conteúdo para um blob com o mesmo nome no backups contêiner. A função retorna o número de bytes copiados para o armazenamento. O orquestrador usa esse valor para calcular a soma agregada.

Observação

Este exemplo move as operações de E/S para uma activityTrigger função. O trabalho pode ser executado em vários computadores e dá suporte à checagem de progresso. Se o processo de host terminar, você saberá quais uploads estão concluídos.

As atividades fazem o trabalho. Ao contrário dos orquestradores, as atividades podem executar operações de E/S e lógica não determinística.

Processar atividade do item de trabalho

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

Atividade de resultados agregados

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

Execute o exemplo de fan-out/fan-in

Inicie a orquestração no Windows enviando a seguinte solicitação HTTP POST:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Como alternativa, em um aplicativo de funções do Linux, inicie a orquestração enviando a seguinte solicitação HTTP POST. Python atualmente é executado no Linux para Serviço de Aplicativo:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Observação

A HttpStart função espera JSON. Inclua o Content-Type: application/json cabeçalho e codifique o caminho do diretório como uma cadeia de caracteres JSON. O extrato HTTP pressupõe que host.json tenha uma entrada que remove o prefixo padrão api/ de todas as URLs de função de gatilho HTTP. Encontre a marcação para esta configuração no arquivo host.json de exemplo.

Esta solicitação HTTP dispara o orquestrador E2_BackupSiteContent e passa a cadeia de caracteres D:\home\LogFiles como um parâmetro. A resposta tem um link para verificar o status da operação de backup:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Dependendo do número de arquivos de log em seu aplicativo de funções, essa operação pode levar vários minutos para ser concluída. Obtenha o status mais recente consultando a URL no Location cabeçalho da resposta HTTP 202 anterior:

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

Nesse caso, a função ainda está sendo executada. A resposta mostra a entrada que foi salva no estado do orquestrador e a hora da última atualização. Use o valor do cabeçalho Location para sondar a conclusão. Quando o status é "Concluído", a resposta se assemelha ao seguinte exemplo:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

A resposta mostra que a orquestração está concluída e o tempo aproximado de conclusão. O campo output indica que a orquestração carregou cerca de 450 KB de logs.

Para executar o exemplo:

  1. Inicie o emulador do Agendador de Tarefas Duráveis para desenvolvimento local.

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Inicia o worker para registrar o orquestrador e as atividades.

  3. Execute o cliente para agendar uma orquestração com uma lista de itens de trabalho:

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

Próximas Etapas 

Este exemplo mostra o padrão fan-out/fan-in. O exemplo a seguir mostra como implementar o padrão de monitor com temporizadores duráveis.

Este artigo demonstra o padrão fan-out/fan-in. Explore mais padrões e recursos:

Para obter exemplos do SDK do JavaScript, consulte os exemplos do SDK do JavaScript da Tarefa Durável.