Executing Tasks Exclusively

——确保任务顺序执行

Posted by eagleboost on May 25, 2025

  优化项目中某项功能时我提出了一个需求,类似于访问WPF的界面控件需要在GUI线程上一样,我希望某些代码在后台线程执行,但同一时间只能干一件事,这样可以简化代码不需要显示使用锁。

  ConcurrentExclusiveSchedulerPair有一个ExclusiveTaskScheduler看起来可以用。

Provides task schedulers that coordinate to execute tasks while ensuring that concurrent tasks may run concurrently and exclusive tasks never do.

提供任务调度器,协调执行任务,确保并发任务可以同时运行,而独占任务则永远不会同时执行。

  我一开始也这么想,然而事实并非如此。首先我的“某些代码”实际上是几段async/await的异步代码,即一个Task,并非同步执行的Action。而基于ExclusiveTaskScheduler创建的TaskFactory是用来调度一个Action或者Func而不是调度一个Task的执行。如果打开TaskFactory.StartNew()方法的提示,是下面这样:

  当传入Func<Task>后,TaskFactory.StartNew()在调用Func<Task>Task创建出来就立即返回了,并不会等到创建的Task执行完成,因此TaskFactoryRunner只实现了我需要的功能的一半——其实是对前面关于ExclusiveTaskScheduler的文档描述有误解,其中说的tasks指的是某些需要执行的代码,并非.Net中的Task对象。

1
2
3
4
5
6
7
8
9
public class TaskFactoryRunner
{
  private readonly TaskFactory _taskFactory = new(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);

  public Task RunAsync(Func<Task> taskFunc)
  {
   return _taskFactory.StartNew(() => taskFunc());
  }
}

  想明白关节后我发现Stephen Cleary在一篇博客中也有类似叙述,并提到Stephen ToubAsyncLock可以用来提供异步锁机制——题外话,Stephen Toub这个系列的Async Coordination Primitives博客文章非常值得读,我在项目中也用到了AsyncLock

Note: When an asynchronous method awaits, it returns back to its context. This means that ExclusiveScheduler is perfectly happy to run one task at a time, not one task until it completes. As soon as an asynchronous method awaits, it’s no longer the “owner” of the ExclusiveScheduler. Stephen Toub’s async-friendly primitives like AsyncLock use a different strategy, allowing an asynchronous method to hold the lock while it awaits.

注意: 异步方法在等待时会返回到其上下文。这意味着 ExclusiveScheduler 非常乐意一次运行一个任务 (而非一个任务直到其完成 )。一旦异步方法开始等待,它就不再是 ExclusiveScheduler 的“拥有者”。Stephen Toub 的异步友好原语如 AsyncLock 采用了不同策略,允许异步方法在等待期间保持锁。

  其实解决办法很简单,只需要调用Wait()方法等待Task执行完成即可:

1
2
3
4
5
6
7
8
9
public class TaskFactoryRunnerWithWait
{
  private readonly TaskFactory _taskFactory = new(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);

  public Task RunAsync(Func<Task> taskFunc)
  {
   return _taskFactory.StartNew(() => taskFunc().Wait());
  }
}

  当然使用AsyncLock也能轻松实现我需要的功能,像下面这样:

1
2
3
4
5
6
7
8
9
10
public class TaskRunnerWithAsyncLock
{
  private readonly AsyncLock _asyncLock = new();

  public async Task RunAsync(Func<Task> taskFunc)
  {
    using var async = await _asyncLock.LockAsync().ConfigureAwait(false);
    await taskFunc().ConfigureAwait(false);
  }
}

  到这里问题似乎差不多解决了,但并没有完。我把这两天刚发布的Claude AI 4拉出来问了问。它给出了好几个答案,其中直接能工作的有两个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
////使用TPL Task chainning
public class SequentialTaskExecutor
{
  private Task _lastTask = Task.CompletedTask;
  private readonly object _lock = new object();

  public Task RunAsync(Func<Task> taskFunc)
  {
    lock (_lock)
    {
      _lastTask = _lastTask.ContinueWith(async _ => await taskFunc()).Unwrap();
      return _lastTask;
    }
  }
}

public class TaskRunnerWithSemaphoreSlim
{
  private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);

  public async Task RunAsync(Func<Task> taskFunc)
  {
    await _semaphore.WaitAsync();
    try
    {
      await taskFunc().ConfigureAwait(false);
    }
    finally
    {
      _semaphore.Release();
    }
  }
}

  当我问它能否使用ExclusiveTaskScheduler来实现的时候它先是给出了与TaskFactoryRunner初始版本类似的错误答案。我告诉它代码不能正常工作并让它找问题,它听懂并给出了下面的正确代码,非常不错。

1
2
3
4
5
6
7
8
9
public class TaskFactoryRunnerWithGetAwaiter
{
  private readonly TaskFactory _taskFactory = new(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);

  public Task RunAsync(Func<Task> taskFunc)
  {
    return _taskFactory.StartNew(() => taskFunc().GetAwaiter().GetResult());
  }
}

  现在我们有了几个版本的实现,那么该选哪一个呢?回归一下需求会发现TaskRunnerWithAsyncLockTaskRunnerWithSemaphoreSlim并不能直接用,因为需要在“后台线程”执行代码避免阻塞主界面,而它们会在调用线程(通常是主线程)执行,所以需要额外调用Task.Run才行:

1
await Task.Run(taskFunc).ConfigureAwait(false);

  下面是以TaskFactoryRunnerWithGetAwaiter为基准连续调度100Task的测试结果:

  不意外,所有版本的执行效率相差无几。

  TaskRunnerWithAsyncLock通过AsyncLock实现了100%优雅的async/await代码,但AsyncLock本身有开销,而且需要调用Task.Run产生额外内存开销,所以#1#2出局。

  SemaphoreSlim内存开销虽然最小,但是也需要额外调用Task.Run才能保证代码在后台线程运行,所以#3#4也出局。

  SequentialTaskExecutor创建Task Chain有额外开销无法进一步优化也出局。

  最后剩下#5#6两个基于TaskFactory的实现,内存开销也最小。#5使用Task.Wait()当异常发生时会被包装进一个AggregateException,而#6使用Task.GetAwaiter().GetResult()会抛出原始异常,因此#6,也就是基准测试TaskFactoryRunnerWithGetAwaiter胜出。

  需要注意的是TaskFactoryRunnerWithGetAwaiter能用的前提是taskFunc()创建的Task必须始终在后台线程执行,否则会死锁。如果需要处理Task可能切换到主线程执行的情况,最好的办法其实是#4,虽然有额外开销,但不会阻塞调用线程。