从BeginInvoke到async/await

Convert BeginInvoke to async/await

Posted by eagleboost on February 21, 2020

The problem

Dispatcher.BeginInvoke is one of the useful functions that is widely used in WPF applications. It’s often used in two scenarios:

  1. Cross thread scheduling - When some calculation is completed on a background thread, we need to notify GUI to refresh and pickup the new data, for example, refresh display of weather information returned from a network weather service provider.
  2. Delayed execution - To keep main GUI responsive to user inputs, sometimes we dispatch calls on main GUI thread with lower priorities.

There’re quite a few overloads for the BeginInvoke metho, below are the two being used most often:

1
2
public DispatcherOperation BeginInvoke(Action action)
public DispatcherOperation BeginInvoke(Action action, DispatcherPriority priority)

It’s simple and straightforward to use, only with some unavoidable problems:

  1. Not unit test friendly - To write unit tests for a ViewModel that explicitly uses Dispatcher is troublesome. One one hand an STA thread is needed in order for Dispatcher to be created, but most of the time there’s no need to verify the delayed execution behavior in the the unit tests, and the creation of STA threads would put unnecessary pressure to the test environment. One the other hand, even if we already have a Dispatcher, to test the delayed execution behavior still needs some work.

  2. Dispatched calls cannot be canceled - To BeginInvoke an Action is similar to creating a Task, however, the Action cannot be canceled like Task by passing an CancellationToken. If an Action dispatched by calling BeginInvoked n times, it would eventually be executed n times, which may not be what we want in some cases.

  3. Coding is not as smooth as async/await style.

Analysis

First let’s look at problem #3. The signature of the BeginInvoke method shows the return value is a DispatcherOperation, and DispatcherOperation is awaitable, so the code piece below is a valid usage:

1
await Dispatcher.BeginInvoke(() => DoSomething());

But BeginInvoke doesn’t work for cases that return value is needed. The good news is that a new InvokeAsync method has been added to the Dispatcher in dotnet 4.5 to support return value and CancellationToken, so #2 is not a problem anymore.

1
2
3
4
public DispatcherOperation<TResult> InvokeAsync<TResult>(
  Func<TResult> callback,
  DispatcherPriority priority,
  CancellationToken ct)

Now we can go back to problem #1 about unit tests. In general, explicit dependency of Dispatcher should be avoided in order to simplify unit tests. To do that one approach is to abstract an IDispatcher interface with a BeginInvoke method, then mock the interface for tests. However, this approach requires extra setup work for the mocked object for the callback. Different mock framework has different ways of doing so, but after all it’s not straightforward to verify invocation of calls being BeginInvoked.

Since the callback style of BeginInvoke is not unit test friendly, we have to find other way out. When BeginInvoke is used, what developers want is sending this Action to the ApplicationIdle job queue, and execute it when the Dispatcher starts to handle the ApplicationIdle job queue, which is equivalent to let me know when the Dispatcher starts to handle the ApplicationIdle job queue, then I’ll execute this Action.

With the separation of wait and execute the problem becomes simplified instantly. Suppose we define an interface like this:

1
2
3
4
public interface IDispatcherWaiter
{
  Task<TaskStatus> WaitAsync(DispatcherPriority priority, CancellationToken ct)
}

Usage would be like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private async Task DoSomethingAsync()
{
  ////waiter is an instance of IDispatcherWaiter
  await waiter.WaitAsync(DispatcherPriority.ApplicationIdle);
  DoSomething();
}

private async Task DoSomethingAsync(CancellationToken ct)
{
  ////waiter is an instance of IDispatcherWaiter
  var status = await waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct);
  if (status == TaskStatus.RanToCompletion)
  {
    DoSomething();
  }  
}

In the unit test we just need to mock a IDispatcherWaiter, or simplify provide a reusable TestDispatcherWaiter that directly returns TaskStatus.RanToCompletion in the WaitAsync method.

Implementation seems also simple enough - await InvokeAsync and pass a CancellationToken:

1
2
3
4
5
6
7
8
9
10
11
12
public class DispatcherWaiter :IDispatcherWaiter
{
  public async Task<TaskStatus> WaitAsync(DispatcherPriority priority, CancellationToken ct)
  {
    return await _dispatcher.InvokeAsync(() => GetResult(ct), priority, ct);
  }

  private static TaskStatus GetResult(CancellationToken ct)
  {
    return ct.IsCancellationRequested ? TaskStatus.Canceled : TaskStatus.RanToCompletion;
  }
}

More Analysis

The await InvokeAsync above seems to work, only with one critical defect.

We all know that when await a Task, ConfigureAwait(true|false) can also be used to control whether to marshal the continuation back to the original captured context or not. True is the default behavior: marshal back to the original context.

await DispatcherOperation behaves the same way as ConfigureAwait(true), so in order for DoSomething() to be executed on the main GUI thread, DoSomethingAsync() needs to be called on the main GUI thread too.

If DoSomethingAsync() is called on some worker thread X as shown below, then DoSomething() would be called on the worker thread X when WaitAsync is completed. This may cause different behavior compare to BeginInvoke(()=> DoSomething()) because the callback of BeginInvoke is invoked on the Dispatcher thread.

1
2
3
4
5
6
7
8
private Task DoSomethingFirstAsync()
{
  return Task.Run(()=>
  {
    ////This is on worker thread X
    DoSomethingAsync();
  });
}

The BeginInvoke(()=> DoSomething()) equivalent should be like this (see comments):

1
2
3
4
5
6
7
private Task DoSomethingAsync()
{
  ////The current thread can be any thread
  await waiter.WaitAsync(DispatcherPriority.ApplicationIdle);
  ////The current thread should be the thread of the Dispatcher that is held by the waiter
  DoSomething();
}

In short await Dispatcher.InvokeAsync would not work.

Solution

Fortunately dotnet design allows us implement custom awaitable objects. The blog await anything wrote by Microsoft software engineer Stephen Toub describes everything you need to know about writing custom awaitable.

In the implementation details of the DispatcherWaiter below, when await waiter.WaitAsync is completed, it would stay on the same Dispatcher thread instead of marshal back to the original captured context, which ensures the codes after that be executed on the expected thread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
////First, define a generic IAwaitable interface, objects implement this interface would be awaitable
public interface IAwaitable<out T, out TResult> : INotifyCompletion where T : class
{
  T GetAwaiter();

  TResult GetResult();

  bool IsCompleted { get; }
}

////Second, define IDispatcherWaiter to extend IAwaitable
public interface IDispatcherWaiter : IAwaitable<IDispatcherWaiter, TaskStatus>
{
  /// <summary>
  /// Returns Dispatcher.CheckAccess
  /// </summary>
  /// <returns></returns>
  bool CheckAccess();
  
  /// <summary>
  /// Calls Dispatcher.VerifyAccess
  /// </summary>
  /// <returns></returns>
  void VerifyAccess();

  /// <summary>
  /// Returns immediately if the caller is onl the Dispatcher thread already, otherwise dispatch to the Dispatcher thread
  /// </summary>
  /// <returns></returns>
  IDispatcherWaiter CheckedWaitAsync();

  /// <summary>
  /// Specify DispatcherPriority and allows cancellation
  /// </summary>
  /// <param name="priority"></param>
  /// <param name="ct"></param>
  /// <returns></returns>  
  IDispatcherWaiter WaitAsync(DispatcherPriority priority = DispatcherPriority.Normal, CancellationToken ct = default(CancellationToken));
}

////Last, the implementation details
public class DispatcherWaiter : IDispatcherWaiter
{
  private readonly Dispatcher _dispatcher;
  private readonly DispatcherPriority _priority = DispatcherPriority.Normal;
  private readonly CancellationToken _ct;
  private bool _isCompleted;

  public DispatcherWaiter(Dispatcher d)
  {
    _dispatcher = d;
  }

  private DispatcherWaiter(Dispatcher d, DispatcherPriority priority, CancellationToken ct)
  {
    if (priority == DispatcherPriority.Send)
    {
      throw new InvalidOperationException("Send priority is not allowed");
    }

    if (priority <= DispatcherPriority.Inactive)
    {
      throw new InvalidOperationException(priority.ToString() + " priority is not allowed");
    }

    _dispatcher = d;
    _priority = priority;
    _ct = ct;
  }

  public IDispatcherWaiter GetAwaiter()
  {
    return this;
  }
  
  public TaskStatus GetResult()
  {
    return _ct.IsCancellationRequested ? TaskStatus.Canceled : TaskStatus.RanToCompletion;
  }

  public bool IsCompleted
  {
    get { return _isCompleted; }
  }

  public void OnCompleted(Action continuation)
  {
    if (_ct.IsCancellationRequested)
    {
      continuation();
    }
    else
    {
      try
      {
        var op = _dispatcher.InvokeAsync(() => { }, _priority, _ct);
        op.Completed += (s, e) => continuation();
        op.Aborted += (s, e) => continuation();
      }
      catch (Exception)
      {
        continuation();
      }
    }
  }

  public bool CheckAccess()
  {
    return _dispatcher.CheckAccess();
  }

  public void VerifyAccess()
  {
    _dispatcher.VerifyAccess();
  }

  public IDispatcherWaiter CheckedWaitAsync()
  {
    return new DispatcherWaiter(_dispatcher) {_isCompleted = _dispatcher.CheckAccess()};
  }

  public IDispatcherWaiter WaitAsync(DispatcherPriority priority = DispatcherPriority.Normal, CancellationToken ct = default(CancellationToken))
  {    
    return new DispatcherWaiter(_dispatcher, priority, ct);
  }
}

Conclusion

Now all of the 3 problems listed above can be considered resolved. Wherever Dispatcher.BeginInvoke is needed just replace it with IDispatcherWaiter.WaitAsync to have smoother coding experience as well as easier unit tests.

To recap the usages:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
////async/await style
private async Task DoSomethingAsync(CancellationToken ct)
{
  var status = await waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct);
  if (status == TaskStatus.RanToCompletion)
  {
    DoSomething();
  }
}

////callback style similar to BeginInvoke()
private DoSomething(CancellationToken ct)
{
  waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct).OnCompleted(() =>
  {
    if (!ct.IsCancellationRequested)
    {
      DoSomething();
    }
  });
}

Extension methods can also be used to further simplify codes and improves readabilities:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class DispatcherWaiterExt
{
  public static IDispatcherWaiter WaitForAppIdleAsync(this IDispatcherWaiter waiter, CancellationToken ct = default(CancellationToken))
  {
    return waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct);
  }
}

private async Task DoSomethingAsync(CancellationToken ct)
{
  if (await waiter.WaitForAppIdleAsync(ct) == TaskStatus.RanToCompletion)
  {
    DoSomething();
  }
}

ps: Visit github for source code, and here’s test results:

Reference

问题

Dispatcher.BeginInvoke是WPF中最常用的方法之一,其使用场合一般说来有两种:

  1. 跨线程调度——在某个后台线程完成了计算,需要通知程序界面更新,比如把从网络服务取回的天气信息显示在一个文本框里。
  2. 延迟执行——为了让主程序界面对用户输入保持响应,有时候我们希望一段代码在主线程里以较低的优先级执行。

BeginInvoke有多个重载,其中最常用的两个签名如下:

1
2
public DispatcherOperation BeginInvoke(Action action)
public DispatcherOperation BeginInvoke(Action action, DispatcherPriority priority)

用起来很简单,然而却存在几个难以回避的问题:

  1. 不便于单元测试——为某个显式使用了DispatcherViewModel写单元测试会非常麻烦。一来Dispatcher需要STA线程,但测试常常不需要验证延迟行为,而且创建STA线程会对测试环境造成不小的压力。二来就算有了Dispatcher,其延迟执行的行为测试起来也令人头痛。

  2. 添加到执行队列的操作无法取消——道理上来说BeginInvoke一个Action跟创建一个Task类似,但却不能像Task一样可以通过CancellationToken来取消。一个被BeginInvoken次的Action也必然会执行n次,这有时不是我们想要的。

  3. 与异步风格的代码格格不入。

分析

先说第三问题。从上面的签名可以看到BeginInvoke实际上返回了一个DispatcherOperation,而后者是可以被await的,所以下面的代码合法:

1
await Dispatcher.BeginInvoke(() => DoSomething());

但是在需要返回值的情况下BeginInvoke就不适用了。好在dotnet 4.5中增加了新的InvokeAsync方法,为异步返回值和用于取消的CancellationToken提供了支持,于是第二个问题也顺带被解决了。

1
2
3
4
public DispatcherOperation<TResult> InvokeAsync<TResult>(
  Func<TResult> callback,
  DispatcherPriority priority,
  CancellationToken ct)

回到第一个问题,关于单元测试。为了便于测试和节省资源,应该避免直接使用Dispatcher。一种做法是抽象出一个包含BeginInvoke方法的IDispatcher接口,这样就可以mock该接口来测试。但问题在于对callbackmock需要额外的setup,不同的mock框架做法不尽相同,但总的来说想要在测试中断言BeginInvoke的方法被调用所需要的准备工作并不容易。

既然BeginInvoke这种callback的做法对测试并不友好,我们就需要换一个角度来思考。当调用BeginInvoke的时候开发者想要的是"把这个Action添加到ApplicationIdle任务队列,等Dispatcher开始处理该队列的时候执行它",它等价于"Dispatcher开始处理ApplicationIdle队列的时候通知我,我将执行这个Action"。

于是问题就立刻变简单了,假设有这样一个接口:

1
2
3
4
public interface IDispatcherWaiter
{
  Task<TaskStatus> WaitAsync(DispatcherPriority priority, CancellationToken ct)
}

调用就变成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private async Task DoSomethingAsync()
{
  ////waiter是一个IDispatcherWaiter的实例
  await waiter.WaitAsync(DispatcherPriority.ApplicationIdle);
  DoSomething();
}

private async Task DoSomethingAsync(CancellationToken ct)
{
  ////waiter是一个IDispatcherWaiter的实例
  var status = await waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct);
  if (status == TaskStatus.RanToCompletion)
  {
    DoSomething();
  }  
}

在单元测试中只需要mock一个IDispatcherWaiter即可,或者直接提供一个可以重用的TestDispatcherWaiterWaitAsync直接返回TaskStatus.RanToCompletion

实现似乎很容易——await InvokeAsync并且传入CancellationToken

1
2
3
4
5
6
7
8
9
10
11
12
public class DispatcherWaiter :IDispatcherWaiter
{
  public async Task<TaskStatus> WaitAsync(DispatcherPriority priority, CancellationToken ct)
  {
    return await _dispatcher.InvokeAsync(() => GetResult(ct), priority, ct);
  }

  private static TaskStatus GetResult(CancellationToken ct)
  {
    return ct.IsCancellationRequested ? TaskStatus.Canceled : TaskStatus.RanToCompletion;
  }
}

再分析

上面的实现直接await DispatcherOperation看似解决了问题,但有一个致命的缺陷。

我们知道,await一个Task的时候,可以通过ConfigureAwait(true|false)来控制在Task执行结束后是否回到await之前捕捉的SynchronizationContext,默认行为是true,即回到之前的SynchronizationContext

await DispatcherOperation的行为等价于ConfigureAwait(true),所以上面两个DoSomethingAsync()方法中DoSomething()最终会在主线程上被调用的前提是DoSomethingAsync()在主线程上被调用。

假如DoSomethingAsync()在某个后台线程X被调用,如下所示,那么当WaitAsync结束时DoSomething()会在线程X上执行,这可能会导致与BeginInvoke(()=> DoSomething())不同的行为,因为BeginInvokecallback是在Dispatcher所在的线程上执行的。

1
2
3
4
5
6
7
8
private Task DoSomethingFirstAsync()
{
  return Task.Run(()=>
  {
    ////此处为后台线程X
    DoSomethingAsync();
  });
}

BeginInvoke(()=> DoSomething())等价的行为应该是这样的(请看注释):

1
2
3
4
5
6
7
private async Task DoSomethingAsync()
{
  ////此处为任何线程
  await waiter.WaitAsync(DispatcherPriority.ApplicationIdle);
  ////此处为waiter所包含的Dispatcher所在线程
  DoSomething();
}

因此await Dispatcher.InvokeAsync实际上不能用。

解决方案

好在dotnet的设计者允许我们实现自定义的awaitable,微软工程师Stephen Toub有一篇await anything的博客介绍了实现自定义awaitable所需的一切知识。

在下面的DispatcherWaiter实现中,当await waiter.WaitAsync执行结束后并不会切换到调用之前的线程,从而保证了接下来的代码在期望的线程上执行。

以下为完整实现,代码很简单,注释略去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
////首先定义一个IAwaitable的通用接口,实现该接口的对象即可被await
public interface IAwaitable<out T, out TResult> : INotifyCompletion where T : class
{
  T GetAwaiter();

  TResult GetResult();

  bool IsCompleted { get; }
}

////定义IDispatcherWaiter
public interface IDispatcherWaiter : IAwaitable<IDispatcherWaiter, TaskStatus>
{
  /// <summary>
  /// 返回Dispatcher.CheckAccess
  /// </summary>
  /// <returns></returns>
  bool CheckAccess();
  
  /// <summary>
  /// 调用Dispatcher.VerifyAccess
  /// </summary>
  /// <returns></returns>
  void VerifyAccess();

  /// <summary>
  /// 假如调用时处于Dispatcher同一线程则直接返回,否则会调度到Dispatcher线程再返回 
  /// </summary>
  /// <returns></returns>
  IDispatcherWaiter CheckedWaitAsync();

  /// <summary>
  /// 以指定优先级调度到Dispatcher线程并允许取消
  /// </summary>
  /// <param name="priority"></param>
  /// <param name="ct"></param>
  /// <returns></returns>  
  IDispatcherWaiter WaitAsync(DispatcherPriority priority = DispatcherPriority.Normal, CancellationToken ct = default(CancellationToken));
}

////具体实现
public class DispatcherWaiter : IDispatcherWaiter
{
  private readonly Dispatcher _dispatcher;
  private readonly DispatcherPriority _priority = DispatcherPriority.Normal;
  private readonly CancellationToken _ct;
  private bool _isCompleted;

  public DispatcherWaiter(Dispatcher d)
  {
    _dispatcher = d;
  }

  private DispatcherWaiter(Dispatcher d, DispatcherPriority priority, CancellationToken ct)
  {
    if (priority == DispatcherPriority.Send)
    {
      throw new InvalidOperationException("Send priority is not allowed");
    }

    if (priority <= DispatcherPriority.Inactive)
    {
      throw new InvalidOperationException(priority.ToString() + " priority is not allowed");
    }

    _dispatcher = d;
    _priority = priority;
    _ct = ct;
  }

  public IDispatcherWaiter GetAwaiter()
  {
    return this;
  }
  
  public TaskStatus GetResult()
  {
    return _ct.IsCancellationRequested ? TaskStatus.Canceled : TaskStatus.RanToCompletion;
  }

  public bool IsCompleted
  {
    get { return _isCompleted; }
  }

  public void OnCompleted(Action continuation)
  {
    if (_ct.IsCancellationRequested)
    {
      continuation();
    }
    else
    {
      try
      {
        var op = _dispatcher.InvokeAsync(() => { }, _priority, _ct);
        op.Completed += (s, e) => continuation();
        op.Aborted += (s, e) => continuation();
      }
      catch (Exception)
      {
        continuation();
      }
    }
  }

  public bool CheckAccess()
  {
    return _dispatcher.CheckAccess();
  }

  public void VerifyAccess()
  {
    _dispatcher.VerifyAccess();
  }

  public IDispatcherWaiter CheckedWaitAsync()
  {
    return new DispatcherWaiter(_dispatcher) {_isCompleted = _dispatcher.CheckAccess()};
  }

  public IDispatcherWaiter WaitAsync(DispatcherPriority priority = DispatcherPriority.Normal, CancellationToken ct = default(CancellationToken))
  {    
    return new DispatcherWaiter(_dispatcher, priority, ct);
  }
}

结论

至此,文章开头提出的几个问题就都得到了完美解决,在任何需要调用Dispatcher.BeginInvoke的地方都可以用IDispatcherWaiter来替换,代码更为流畅,也更易于编写单元测试。

总结一下使用方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
////async/await风格
private async Task DoSomethingAsync(CancellationToken ct)
{
  var status = await waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct);
  if (status == TaskStatus.RanToCompletion)
  {
    DoSomething();
  }
}

////类似于BeginInvoke(()=> DoSomething())的传统风格
private DoSomething(CancellationToken ct)
{
  waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct).OnCompleted(() =>
  {
    if (!ct.IsCancellationRequested)
    {
      DoSomething();
    }
  });
}

我们还可以写一系列扩展方法来帮助简化代码进一步提高可读性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class DispatcherWaiterExt
{
  public static IDispatcherWaiter WaitForAppIdleAsync(this IDispatcherWaiter waiter, CancellationToken ct = default(CancellationToken))
  {
    return waiter.WaitAsync(DispatcherPriority.ApplicationIdle, ct);
  }
}

private async Task DoSomethingAsync(CancellationToken ct)
{
  if (await waiter.WaitForAppIdleAsync(ct) == TaskStatus.RanToCompletion)
  {
    DoSomething();
  }
}

: 完整实现请移步github。 最后附上测试结果:

参考资料