SynchronizationContext e applicazioni console

Frameworks dell'interfaccia utente come Windows Forms, macchine virtuali Windows e .NET MAUI installano SynchronizationContext nel thread dell'interfaccia utente. Quando si esegue await un'attività in tali ambienti, la continuazione esegue automaticamente il postback al thread dell'interfaccia utente. Le app console non installano un file SynchronizationContext, il che significa che le continuazioni await vengono eseguite nel pool di thread. Questo articolo illustra le conseguenze e illustra come creare una pompa di messaggi a thread singolo quando necessario.

Comportamento predefinito in un'applicazione console

In un'app console, SynchronizationContext.Current restituisce null. Quando un metodo restituisce un oggetto await, la continuazione viene eseguita su qualsiasi thread del pool di thread disponibile:

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Output rappresentativo dall'esecuzione di questo programma:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

Il thread 1 (thread principale) viene visualizzato una sola volta, durante la prima iterazione sincrona prima di await Task.Yield() sospendere il metodo. Tutte le iterazioni successive vengono eseguite sui thread del pool di thread.

Punti di ingresso asincroni moderni

A partire da C# 7.1, è possibile dichiarare Main come async Task o async Task<int>. In C# 9 e versioni successive è possibile usare direttamente le istruzioni di primo livello con await :

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Questi punti di ingresso non installano un SynchronizationContext. Il runtime genera un bootstrap che chiama il tuo metodo asincrono e si blocca sull'oggetto restituito Task, in modo simile a .GetAwaiter().GetResult(). Le continuazioni vengono eseguite ancora nel pool di thread.

Quando è necessaria l'affinità di thread

Per molte app console, l'esecuzione di continuazioni nel pool di thread è corretta. Tuttavia, alcuni scenari richiedono che tutte le continuazioni vengano eseguite in un singolo thread:

  • Esecuzione serializzata: più operazioni asincrone simultanee condividono lo stato senza blocchi eseguendo le relative continuazioni nello stesso thread.
  • Requisiti della libreria: alcune librerie o oggetti COM richiedono l'affinità con un thread specifico.
  • Unit testing: i framework di test potrebbero richiedere l'esecuzione deterministica e a thread singolo del codice asincrono.

Creare un oggetto SynchronizationContext a thread singolo

Per eseguire tutte le continuazioni in un solo thread, sono necessari due elementi:

  1. Oggetto SynchronizationContext il cui Post metodo code funziona in una raccolta thread-safe.
  2. Ciclo di gestione dei messaggi che elabora la coda nel thread di destinazione.

Contesto personalizzato

Il contesto usa un BlockingCollection<T> per coordinare i produttori (le continuazioni asincrone) e il consumatore (ciclo di estrazione):

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Metodo AsyncPump.Run

AsyncPump.Run installa il contesto personalizzato, richiama il metodo asincrono e pompa le continuazioni nel thread chiamante fino al completamento del metodo:

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Vederlo in azione

Sostituire la chiamata predefinita con AsyncPump.Run:

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Risultato:

[1, 10000]

L'ID thread specifico può variare a seconda del runtime e della piattaforma, ma il risultato chiave è che tutte le 10.000 iterazioni vengono eseguite su un singolo thread: il thread principale.

Gestire i metodi void asincroni

Il sovraccarico Func<Task> tiene traccia del completamento attraverso il valore restituito Task. I metodi asincroni void non restituiscono un task, ma notificano il SynchronizationContext corrente tramite OperationStarted() e OperationCompleted(). Per supportare i metodi asincroni void , estendere il contesto per tenere traccia delle operazioni in sospeso:

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Con il rilevamento delle operazioni abilitato, la pompa viene chiusa solo quando tutti i metodi asincroni void in sospeso vengono completati, non solo l'attività di primo livello.

Considerazioni pratiche

  • Rischio di deadlock: se il codice in esecuzione all'interno dei blocchi AsyncPump.Run opera sincronicamente (ad esempio, chiamando .Result o .Wait() su un'attività la cui continuazione deve fare il postback al thread della pompa), il thread della pompa non può elaborare tale continuazione. Il risultato è uno stallo. Lo stesso problema è descritto in Wrapper sincrono per i metodi asincroni.
  • Prestazioni: una pompa a thread singolo limita la velocità effettiva a un thread. Usare questo approccio solo quando l'affinità di thread è importante.
  • Multipiattaforma: l'implementazione mostrata qui utilizza solo tipi dai namespace System.Collections.Concurrent e System.Threading. Funziona su tutte le piattaforme supportate da .NET.

Vedere anche