Applications console et SynchronizationContext

Les frameworks d’interface utilisateur tels que Windows Forms, WPF et .NET MAUI installent un SynchronizationContext sur leur thread d’interface utilisateur. Lorsque vous effectuez await une tâche dans ces environnements, la continuation renvoie automatiquement au thread d’interface utilisateur. Les applications console n’installent pas un SynchronizationContext, ce qui signifie que await les continuations s’exécutent sur le pool de threads. Cet article explique les conséquences et montre comment créer une pompe de messages à thread unique quand vous en avez besoin.

Comportement par défaut dans une application console

Dans une application console, SynchronizationContext.Current retourne null. Lorsqu’une méthode génère un await, la continuation s’exécute sur le thread du pool de threads disponible :

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Sortie représentative de l’exécution de ce programme :

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

Le thread 1 (le thread principal) n’apparaît qu’une seule fois, pendant la première itération synchrone avant await Task.Yield() de suspendre la méthode. Toutes les itérations suivantes s’exécutent sur des threads du pool.

Points d’entrée asynchrones modernes

À compter de C# 7.1, vous pouvez déclarer Main en tant que async Task ou async Task<int>. En C# 9 et versions ultérieures, vous pouvez utiliser des instructions de niveau supérieur avec await directement :

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Ces points d’entrée n’installent pas un SynchronizationContext. Le runtime génère un bootstrap qui appelle votre méthode asynchrone et bloque sur le résultat retourné Task, comme lors de l'appel à .GetAwaiter().GetResult(). Les continuations s’exécutent toujours sur le pool de threads.

Quand vous avez besoin d’une affinité de fil

Pour de nombreuses applications console, l’exécution de continuations sur le pool de threads est correcte. Toutefois, certains scénarios nécessitent que toutes les continuations s’exécutent sur un thread unique :

  • Exécution sérialisée : plusieurs opérations asynchrones simultanées partagent l’état sans verrous en exécutant leurs continuations sur le même thread.
  • Exigences en matière de bibliothèque : certaines bibliothèques ou objets COM nécessitent une affinité avec un thread particulier.
  • Tests unitaires : les frameworks de test peuvent avoir besoin d’une exécution déterministe et monothread de code asynchrone.

Créer un SynchronizationContext à thread unique

Pour exécuter toutes les continuations sur un thread, vous avez besoin de deux éléments :

  1. SynchronizationContext Dont Post les files d’attente de méthodes fonctionnent sur une collection thread-safe.
  2. Boucle de pompe de messages qui traite cette file d’attente sur le thread cible.

Contexte personnalisé

Le contexte utilise un BlockingCollection<T> pour coordonner les producteurs (les continuations asynchrones) et un consommateur (la boucle de pompe) :

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Méthode AsyncPump.Run

AsyncPump.Run installe le contexte personnalisé, appelle la méthode asynchrone et pompe les continuations sur le thread appelant jusqu’à ce que la méthode se termine :

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Voir cela en action

Remplacez l’appel par défaut par AsyncPump.Run:

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Output:

[1, 10000]

L’ID de thread spécifique peut différer selon le runtime et la plateforme, mais le résultat clé est que toutes les 10 000 itérations s’exécutent sur un seul thread : le thread principal.

Gérer les méthodes async void

La surcharge Func<Task> suit l’achèvement par le retour Task. Les méthodes async void ne renvoient pas de tâche ; à la place, elles notifient l'appelant actuel SynchronizationContext via OperationStarted() et OperationCompleted(). Pour prendre en charge les méthodes asynchrones void , étendez le contexte pour suivre les opérations en attente :

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Avec le suivi des opérations activé, la pompe se ferme uniquement lorsque toutes les méthodes asynchrones void en attente se terminent, pas seulement la tâche de niveau supérieur.

Considérations pratiques

  • Risque d’interblocage : si le code s’exécutant à l’intérieur de blocs AsyncPump.Run de façon synchrone (par exemple, en appelant .Result ou .Wait() sur une tâche dont la continuation doit être renvoyée au thread de la pompe), le thread de pompe ne peut pas traiter cette continuation. Le résultat est un interblocage. Le même problème est décrit dans les wrappers synchrones pour les méthodes asynchrones.
  • Performances : une pompe mono-thread limite le débit à un seul thread. Utilisez cette approche uniquement lorsque l’affinité de thread est importante.
  • Multiplateforme : l’implémentation présentée ici utilise uniquement les types des espaces de noms System.Collections.Concurrent et System.Threading. Il fonctionne sur toutes les plateformes prises en charge par .NET.

Voir également