SynchronizationContext en console-apps

UI-frameworks zoals Windows Forms, WPF en .NET MAUI installeren een SynchronizationContext op hun UI-thread. Wanneer u await een taak in deze omgevingen uitvoert, wordt de voortzetting automatisch teruggezet naar de UI-thread. Consoleapps installeren geen SynchronizationContext, wat betekent dat await voortzettingen worden uitgevoerd op de threadpool. In dit artikel worden de gevolgen uitgelegd en wordt uitgelegd hoe u een berichtpomp met één thread bouwt wanneer u er een nodig hebt.

Standaardgedrag in een console-app

In een console-app retourneert SynchronizationContext.Currentnull. Wanneer een methode resulteert in een await, wordt de voortzetting uitgevoerd op de threadgroep die beschikbaar is:

static void DefaultBehaviorDemo()
{
    DemoAsync().GetAwaiter().GetResult();
}

static async Task DemoAsync()
{
    var d = new Dictionary<int, int>();
    for (int i = 0; i < 10_000; i++)
    {
        int id = Thread.CurrentThread.ManagedThreadId;
        d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

        await Task.Yield();
    }

    foreach (var pair in d)
        Console.WriteLine(pair);
}

Representatieve uitvoer van het uitvoeren van dit programma:

[1, 1]
[3, 2687]
[4, 2399]
[5, 2397]
[6, 2516]

Thread 1 (de hoofdthread) wordt slechts één keer weergegeven, tijdens de eerste synchrone iteratie voordat await Task.Yield() de methode wordt onderbroken. Alle volgende iteraties worden uitgevoerd op Thread Pool-draad.

Moderne asynchrone toegangspunten

Vanaf C# 7.1 kunt u declareren Main als async Task of async Task<int>. In C# 9 en hoger kunt u instructies op het hoogste niveau rechtstreeks gebruiken met await :

// Top-level statements (C# 9+)
await DemoAsync();
// async Task Main (C# 7.1+)
static async Task Main()
{
    await DemoAsync();
}

Deze toegangspunten installeren geen SynchronizationContext. De runtime genereert een bootstrap die uw asynchrone methode aanroept en vervolgens wacht op het resultaat van de geretourneerde Task, vergelijkbaar met het aanroepen van .GetAwaiter().GetResult(). Vervolgen worden nog steeds uitgevoerd op de threadpool.

Wanneer u draad-affiniteit nodig hebt

Voor veel console-apps is het uitvoeren van continuaties op de threadpool prima. Voor sommige scenario's is echter vereist dat alle vervolgen worden uitgevoerd op één thread:

  • Geserialiseerde uitvoering: Meerdere gelijktijdige asynchrone bewerkingen delen de status zonder vergrendelingen door hun vervolgbewerkingen uit te voeren op dezelfde thread.
  • Bibliotheekvereisten: Voor sommige bibliotheken of COM-objecten is affiniteit met een bepaalde thread vereist.
  • Eenheidstests: Testframeworks hebben mogelijk deterministische, enkelvoudige uitvoering van asynchrone code nodig.

Een synchronisatiecontext met één thread bouwen

Als u alle vervolgen op één thread wilt uitvoeren, hebt u twee dingen nodig:

  1. Een SynchronizationContext wiens Post methodewachtrijen werken naar een thread-veilige verzameling.
  2. Een berichtenpomplus die die wachtrij op de doelthread verwerkt.

De aangepaste context

De context maakt gebruik van een BlockingCollection<T> om producenten (de asynchrone vervolgen) en een consument (de pomplus) te coördineren:

sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
Class SingleThreadSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

De methode AsyncPump.Run

AsyncPump.Run installeert de aangepaste context, roept de asynchrone methode aan en verwerkt continuaties in de aanroepende thread totdat de methode is voltooid.

static class AsyncPump
{
    public static void Run(Func<Task> func)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Task t;
            try
            {
                t = func();
            }
            catch
            {
                syncCtx.Complete();
                throw;
            }

            t.ContinueWith(
                _ => syncCtx.Complete(), TaskScheduler.Default);

            syncCtx.RunOnCurrentThread();

            t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
Class AsyncPump
    Public Shared Sub Run(func As Func(Of Task))
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New SingleThreadSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            Dim t As Task
            Try
                t = func()
            Catch
                syncCtx.Complete()
                Throw
            End Try

            t.ContinueWith(
                Sub(unused) syncCtx.Complete(), TaskScheduler.Default)

            syncCtx.RunOnCurrentThread()

            t.GetAwaiter().GetResult()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub

Zie het in actie

Vervang de standaardoproep door AsyncPump.Run:

static void AsyncPumpDemo()
{
    AsyncPump.Run(async () =>
    {
        var d = new Dictionary<int, int>();
        for (int i = 0; i < 10_000; i++)
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            d[id] = d.TryGetValue(id, out int count) ? count + 1 : 1;

            await Task.Yield();
        }

        foreach (var pair in d)
            Console.WriteLine(pair);
    });
}
Sub AsyncPumpDemo()
    AsyncPump.Run(
        Async Function() As Task
            Dim d As New Dictionary(Of Integer, Integer)()
            For i As Integer = 0 To 9999
                Dim id As Integer = Thread.CurrentThread.ManagedThreadId
                Dim count As Integer
                If d.TryGetValue(id, count) Then
                    d(id) = count + 1
                Else
                    d(id) = 1
                End If

                Await Task.Yield()
            Next

            For Each pair In d
                Console.WriteLine(pair)
            Next
        End Function)
End Sub

Uitvoer:

[1, 10000]

De specifieke thread-id kan verschillen, afhankelijk van de runtime en het platform, maar het belangrijkste resultaat is dat alle 10.000 iteraties worden uitgevoerd op één thread: de hoofdthread.

Asynchrone void-methoden verwerken

De Func<Task> overbelasting houdt de voltooiing bij via de geretourneerde Task. Asynchrone void methoden retourneren geen taak. In plaats daarvan melden ze de huidige SynchronizationContext gegevens via OperationStarted() en OperationCompleted(). Als u asynchrone void methoden wilt ondersteunen, kunt u de context uitbreiden om openstaande bewerkingen bij te houden:

    public static void Run(Action asyncMethod)
    {
        SynchronizationContext? prevCtx = SynchronizationContext.Current;
        try
        {
            var syncCtx = new AsyncVoidSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(syncCtx);

            Exception? caughtException = null;

            syncCtx.OperationStarted();
            try
            {
                asyncMethod();
            }
            catch (Exception ex)
            {
                caughtException = ex;
                syncCtx.Complete();
            }
            finally
            {
                syncCtx.OperationCompleted();
            }

            syncCtx.RunOnCurrentThread();

            if (caughtException is not null)
            {
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(caughtException).Throw();
            }
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(prevCtx);
        }
    }
}

sealed class AsyncVoidSynchronizationContext : SynchronizationContext
{
    private readonly
        BlockingCollection<KeyValuePair<SendOrPostCallback, object?>> _queue = new();
    private int _operationCount;

    public override void Post(SendOrPostCallback d, object? state)
    {
        _queue.Add(new KeyValuePair<SendOrPostCallback, object?>(d, state));
    }

    public override void OperationStarted() =>
        Interlocked.Increment(ref _operationCount);

    public override void OperationCompleted()
    {
        if (Interlocked.Decrement(ref _operationCount) == 0)
            Complete();
    }

    public void RunOnCurrentThread()
    {
        while (_queue.TryTake(out KeyValuePair<SendOrPostCallback, object?> workItem,
            Timeout.Infinite))
        {
            workItem.Key(workItem.Value);
        }
    }

    public void Complete() => _queue.CompleteAdding();
}
    Public Shared Sub Run(asyncMethod As Action)
        Dim prevCtx As SynchronizationContext = SynchronizationContext.Current
        Try
            Dim syncCtx As New AsyncVoidSynchronizationContext()
            SynchronizationContext.SetSynchronizationContext(syncCtx)

            syncCtx.OperationStarted()
            Try
                asyncMethod()
            Catch
                syncCtx.Complete()
                Throw
            Finally
                syncCtx.OperationCompleted()
            End Try

            syncCtx.RunOnCurrentThread()
        Finally
            SynchronizationContext.SetSynchronizationContext(prevCtx)
        End Try
    End Sub
End Class

Class AsyncVoidSynchronizationContext
    Inherits SynchronizationContext

    Private ReadOnly _queue As New _
        BlockingCollection(Of KeyValuePair(Of SendOrPostCallback, Object))()
    Private _operationCount As Integer

    Public Overrides Sub Post(d As SendOrPostCallback, state As Object)
        _queue.Add(New KeyValuePair(Of SendOrPostCallback, Object)(d, state))
    End Sub

    Public Overrides Sub OperationStarted()
        Interlocked.Increment(_operationCount)
    End Sub

    Public Overrides Sub OperationCompleted()
        If Interlocked.Decrement(_operationCount) = 0 Then
            Complete()
        End If
    End Sub

    Public Sub RunOnCurrentThread()
        Dim workItem As New KeyValuePair(Of SendOrPostCallback, Object)(Nothing, Nothing)
        While _queue.TryTake(workItem, Timeout.Infinite)
            workItem.Key.Invoke(workItem.Value)
        End While
    End Sub

    Public Sub Complete()
        _queue.CompleteAdding()
    End Sub
End Class

Als het bijhouden van bewerkingen is ingeschakeld, wordt de pomp alleen afgesloten wanneer alle openstaande asynchrone void methoden zijn voltooid, niet alleen de taak op het hoogste niveau.

Praktische overwegingen

  • Impasserisico: Als code in AsyncPump.Run blokken synchroon draait (bijvoorbeeld door .Result of .Wait() aan te roepen op een taak waarvan de voortzetting naar de pomp moet worden teruggestuurd), kan de pompthread die voortzetting niet verwerken. Het resultaat is een impasse. Hetzelfde probleem wordt beschreven in synchrone wrappers voor asynchrone methoden.
  • Prestaties: Een pomp met één thread beperkt de doorvoer tot één thread. Gebruik deze benadering alleen wanneer thread-affiniteit belangrijk is.
  • Platformoverschrijdend: De AsyncPump implementatie die hier wordt weergegeven, maakt gebruik van alleen typen uit de System.Collections.Concurrent en System.Threading naamruimten. Het werkt op alle platforms die .NET ondersteunt.

Zie ook