Partilhar via


Biblioteca System.Threading.Channels

O System.Threading.Channels namespace fornece um conjunto de estruturas de dados de sincronização para passar dados entre produtores e consumidores de forma assíncrona. A biblioteca tem como alvo .NET, .NET Standard e .NET Framework, e funciona em todas as implementações .NET.

Esta biblioteca está disponível no 📦 pacote System.Threading.Channels NuGet. No entanto, se estiveres a usar .NET Core 3.0 ou posterior, o pacote está incluído como parte do framework partilhado.

Modelo de programação conceptual produtor/consumidor

Os canais são uma implementação do modelo de programação conceptual produtor/consumidor. Neste modelo de programação, os produtores produzem dados de forma assíncrona e os consumidores consomem esses dados de forma assíncrona. Em outras palavras, esse modelo passa dados de uma parte para outra por meio de uma fila FIFO ("first-in, first-out"). Pense em canais como qualquer outro tipo de coleção genérica comum, como um List<T>. A principal diferença é que essa coleção gerencia a sincronização e fornece vários modelos de consumo por meio de opções de criação de fábrica. Estas opções controlam o comportamento dos canais, tais como:

  • Quantos elementos podem armazenar e o que acontece se esse limite for atingido.
  • Quer o canal seja acedido por múltiplos produtores ou por vários consumidores em simultâneo.

Utilização básica

O exemplo seguinte demonstra a utilização básica de um canal, onde um produtor escreve itens e um consumidor lê-los:

static async Task BasicUsageAsync()
{
    Channel<int> channel = Channel.CreateUnbounded<int>();

    Task producer = ProduceAsync(channel.Writer);
    Task consumer = ConsumeAsync(channel.Reader);

    await Task.WhenAll(producer, consumer);

    static async Task ProduceAsync(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 5; i++)
        {
            await writer.WriteAsync(i);
        }

        writer.Complete();
    }

    static async Task ConsumeAsync(ChannelReader<int> reader)
    {
        await foreach (int item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Received: {item}");
        }
    }
}

Estratégias de delimitação

Dependendo de como um Channel<T> é criado, seu leitor e escritor se comportam de forma diferente.

Para criar um canal que especifique uma capacidade máxima, chame Channel.CreateBounded. Para criar um canal que seja usado por qualquer número de leitores e escritores simultaneamente, chame Channel.CreateUnbounded. Cada estratégia de delimitação expõe várias opções definidas pelo criador, seja BoundedChannelOptions ou UnboundedChannelOptions.

Nota

Independentemente da estratégia de limitação, um canal sempre lança um ChannelClosedException quando é utilizado após ser encerrado.

Canais ilimitados

Para criar um canal ilimitado, chame uma das Channel.CreateUnbounded sobrecargas:

var channel = Channel.CreateUnbounded<T>();

Quando se cria um canal sem limites, por padrão, o canal pode ser utilizado por qualquer número de leitores e escritores simultaneamente. Como alternativa, você pode especificar um comportamento não padrão ao criar um canal não limitado fornecendo uma UnboundedChannelOptions instância. A capacidade do canal é ilimitada e todas as gravações são realizadas de forma síncrona. Para obter mais exemplos, consulte Padrões de criação ilimitados.

Canais delimitados

Para criar um canal limitado, utilize uma das sobrecargas de Channel.CreateBounded.

var channel = Channel.CreateBounded<T>(7);

O código anterior cria um canal que tem uma capacidade máxima de 7 itens. Quando você cria um canal limitado, o canal é vinculado a uma capacidade máxima. Quando o limite é atingido, o comportamento padrão é que o canal bloqueia assincronamente o produtor até que o espaço fique disponível. Você pode configurar esse comportamento especificando uma opção ao criar o canal. Os canais delimitados podem ser criados com qualquer valor de capacidade superior a zero. Para outros exemplos, consulte Padrões de criação limitados.

Comportamento do modo completo

Ao usar um canal limitado, você pode especificar o comportamento ao qual o canal adere quando o limite configurado é atingido. A tabela a seguir lista os comportamentos de modo completo para cada BoundedChannelFullMode valor:

Valor Comportamento
BoundedChannelFullMode.Wait Este é o valor predefinido. Chamadas para WriteAsync aguardam que espaço esteja disponível para concluir a operação de gravação. Chamadas para TryWrite retornam false imediatamente.
BoundedChannelFullMode.DropNewest Remove e ignora o item mais recente no canal para abrir espaço para o item que está a ser escrito.
BoundedChannelFullMode.DropOldest Remove e ignora o item mais antigo no canal para abrir espaço para o item que está sendo escrito.
BoundedChannelFullMode.DropWrite Descarta o item que está sendo escrito.

Importante

Sempre que um Channel<TWrite,TRead>.Writer produz mais rápido do que um Channel<TWrite,TRead>.Reader pode consumir, o escritor do canal experimenta pressão de volta.

APIs produtoras

A funcionalidade do produtor é exposta no Channel<TWrite,TRead>.Writer. As APIs do produtor e o comportamento esperado são detalhados na tabela a seguir:

API Comportamento esperado
ChannelWriter<T>.Complete Marca o canal como completo, o que significa que não há mais itens gravados nele.
ChannelWriter<T>.TryComplete Tenta marcar o canal como concluído, indicando que não serão mais gravados dados nele.
ChannelWriter<T>.TryWrite Tenta escrever o item especificado no canal. Quando usado com um canal não limitado, isso sempre retorna true, a menos que o escritor do canal sinalize a conclusão com ChannelWriter<T>.Complete ou ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Retorna um ValueTask<TResult> que é concluído quando há espaço disponível para escrever um item.
ChannelWriter<T>.WriteAsync Grava um item de forma assíncrona no canal.

APIs para Consumidor

A funcionalidade do utilizador está disponível no Channel<TWrite,TRead>.Reader. As APIs do consumidor e o comportamento esperado são detalhados na tabela a seguir:

API Comportamento esperado
ChannelReader<T>.ReadAllAsync Cria um IAsyncEnumerable<T> que permite ler todos os dados do canal.
ChannelReader<T>.ReadAsync Lê de forma assíncrona um item do canal.
ChannelReader<T>.TryPeek Tentativas de consultar um artigo do canal.
ChannelReader<T>.TryRead Tenta efetuar a leitura de um item do canal.
ChannelReader<T>.WaitToReadAsync Retorna um ValueTask<TResult> que é finalizado quando os dados estão disponíveis para leitura.

Padrões de utilização comuns

Existem vários padrões de utilização para os canais:

A API foi projetada para ser simples, consistente e o mais flexível possível. Todos os métodos assíncronos retornam um ValueTask (ou ValueTask<bool>) que representa uma operação assíncrona leve que pode evitar a alocação se a operação for concluída de forma síncrona e, potencialmente, até assíncrona. Além disso, a API é projetada para ser composta, na medida em que o criador de um canal faz promessas sobre seu uso pretendido. Quando um canal é criado com determinados parâmetros, a implementação interna pode operar de forma mais eficiente conhecendo essas promessas.

Padrões de criação

Imagine que está a criar uma solução de produtor/consumidor para um sistema de posição global (GPS). Você deseja rastrear as coordenadas de um dispositivo ao longo do tempo. Um objeto de coordenadas de exemplo pode ter esta aparência:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Padrões de criação ilimitados

Um padrão de uso comum é criar um canal padrão ilimitado:

var channel = Channel.CreateUnbounded<Coordinates>();

Mas, em vez disso, imagine que quer criar um canal ilimitado com múltiplos produtores e consumidores. Definir SingleWriter = false e SingleReader = false nas opções de canais:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

Neste caso, todas as operações de escrita são síncronas, mesmo WriteAsync. Este comportamento ocorre porque um canal sem limites tem sempre imediatamente espaço disponível para uma escrita. No entanto, ao definir AllowSynchronousContinuations para true, as operações de escrita poderão acabar por fazer trabalho associado a um leitor ao executar as suas continuações. Esta definição não afeta a sincronicidade da operação.

Padrões de criação delimitados

Nos canais limitados , a configurabilidade do canal deve ser conhecida pelo consumidor para ajudar a garantir um consumo adequado. Ou seja, o consumidor deve saber qual o comportamento que o canal apresenta quando o limite configurado é atingido. Os exemplos seguintes mostram alguns dos padrões comuns de criação limitada.

A forma mais simples de criar um canal limitado é especificar uma capacidade. O código seguinte cria um canal limitado com capacidade máxima de 1.

var channel = Channel.CreateBounded<Coordinates>(1);

Existem outras opções disponíveis. Algumas opções são iguais a um canal ilimitado, enquanto outras são específicas para canais limitados. No código seguinte, o canal é criado como um canal limitado limitado a 1.000 itens, com um único escritor mas muitos leitores. Seu comportamento de modo completo é definido como DropWrite, o que significa que ele descarta o item que está sendo escrito se o canal estiver cheio.

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Para observar itens que são descartados ao usar canais limitados, registe um callback itemDropped.

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Sempre que o canal está cheio e um novo item é adicionado, o itemDropped retorno de chamada é invocado. Neste exemplo, o callback fornecido escreve o item no console, mas está livre para executar qualquer outra ação que desejar.

Padrões de produção

Imagine que o produtor neste cenário está escrevendo novas coordenadas para o canal. O produtor pode fazê-lo ligando para TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

O código do gerador anterior:

  • Aceita o Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) como um argumento, juntamente com o inicial Coordinates.
  • Define um loop condicional while que tenta mover as coordenadas usando TryWrite.

Um produtor alternativo pode utilizar o WriteAsync método:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Novamente, o Channel<Coordinates>.Writer é usado dentro de um while loop. Mas, desta vez, chama-se o método WriteAsync. O método continua somente depois que as coordenadas foram gravadas. Quando o while loop sai, é feita uma chamada para Complete, o que sinaliza que não há mais dados a serem gravados no canal.

Outro padrão de produtor é usar o método WaitToWriteAsync, considere o seguinte código:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Como parte do condicional while, o resultado da WaitToWriteAsync chamada é usado para determinar se o loop deve continuar.

Padrões de consumo

Existem vários padrões comuns de consumo de canais. Quando um canal não termina, o que significa que produz dados indefinidamente, o consumidor pode usar um while (true) loop e ler os dados à medida que ficam disponíveis:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Nota

Este código lança uma exceção se o canal estiver fechado.

Um consumidor alternativo poderia evitar essa preocupação usando um loop while aninhado, conforme mostrado no código a seguir:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

No código anterior, o consumidor aguarda para ler os dados. Uma vez que os dados estão disponíveis, o consumidor tenta lê-los. Esses loops continuam a avaliar até que o produtor do canal sinalize que não tem mais dados para serem lidos. Dito isto, quando um produtor é conhecido por produzir um número finito de itens e sinaliza a conclusão, o consumidor pode usar await foreach semânticas para iterar sobre os itens:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

O código anterior usa o ReadAllAsync método para ler todas as coordenadas do canal.

Múltiplos produtores e consumidores

Os canais suportam múltiplos produtores e consumidores em simultâneo. Para permitir isto, crie um canal com SingleWriter = false e SingleReader = false nas opções de canal. Depois, distribui-se a escrita em várias tarefas dos produtores e converge-se a leitura em várias tarefas dos consumidores.

static async Task UseMultipleProducersAndConsumersAsync()
{
    Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
        new UnboundedChannelOptions
        {
            SingleWriter = false,
            SingleReader = false
        });

    // Start three concurrent producer tasks.
    Task[] producerTasks = Enumerable.Range(0, 3)
        .Select(id => ProduceAsync(id, channel))
        .ToArray();

    // Start two concurrent consumer tasks.
    Task[] consumerTasks = Enumerable.Range(0, 2)
        .Select(_ => ConsumeAsync(channel))
        .ToArray();

    // Wait for all producers to finish, then mark the channel as complete.
    await Task.WhenAll(producerTasks);
    channel.Writer.Complete();

    // Wait for all consumers to finish.
    await Task.WhenAll(consumerTasks);

    static async Task ProduceAsync(int id, Channel<Coordinates> channel)
    {
        Coordinates coordinates = new(
            DeviceId: Guid.NewGuid(),
            Latitude: -90 + (id * 30),
            Longitude: -180 + (id * 60));

        while (coordinates is { Latitude: < 90, Longitude: < 180 })
        {
            coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + 0.5,
                Longitude = coordinates.Longitude + 1
            };
                
            await channel.Writer.WriteAsync(coordinates);
        }
    }

    static async Task ConsumeAsync(Channel<Coordinates> channel)
    {
        await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine(coordinates);
        }
    }
}

O código anterior:

  • Cria um canal ilimitado que suporta explicitamente múltiplos escritores e leitores em simultâneo.
  • Inicia três tarefas de produtores simultâneos, cada uma escreve uma série de coordenadas com um identificador único de dispositivo.
  • Inicia duas tarefas de consumo simultâneas, cada uma a partir do mesmo canal usando ReadAllAsync.
  • Espera que todos os produtores terminem, depois chama Complete para sinalizar que não há mais dados escritos para o canal.
  • Espera que todos os consumidores terminem de drenar os dados restantes do canal.

Sugestão

Depois de vários produtores, só chame channel.Writer.Complete() depois que todos terminarem de escrever. Isto sinaliza que não há mais dados escritos, permitindo ReadAllAsync() concluir após consumir todos os itens restantes.

Consulte também