C# 高级:TAP 异步编程

共 7904字,需浏览 16分钟

 ·

2021-12-10 10:38

.NET大牛之路 • 王亮@精致码农 • 2021.10.12

我们的应用程序广泛使用文件和网络 I/O 操作,I/O 相关 API 传统上默认是阻塞的,导致用户体验和硬件利用率不佳,此类问题的编码难度也较大。

解决此类问题需要使用异步编程,异步强调的是非阻塞,是一种编程模式,主要解决了因文件、网络等 I/O 操作阻塞主线程工作的问题,比如阻塞期间 UI 无法响应问题。

而异步编程又可以借助多线程技术来解决。前面我们讲了基于 System.Threading 命名空间的多线程编程,该命名空间提供的类型是直接和线程相关的 API,虽然可以用来实现异步操作,但有些繁琐。随着 .NET 的发展,.NET 对多线程编程相继做了进一步的抽象封装,引入了 System.Threading.Tasks 命名空间,使多线程异步编程更简单易懂。

异步编程主要有如下用途:

  • 在等待 I/O 请求返回的过程中,通过让出线程使其能处理更多的服务器请求。

  • 在等待 I/O 请求时让出线程使其继续进行 UI 交互,并将需要长时间运行的工作过渡到其他 CPU 线程,使用户界面的响应性更强。

使用 .NET 基于 Task 的异步模型可以直接编写 I/O 受限和 CPU 受限的异步代码。该模型围绕着 TaskTask 类型以及 C# 的 asyncawait 关键字展开。本文将讲解如何使用 .NET 异步编程及一些常见的异步编程操作。

1Task 和 Task

Task 是 Promise 模型的实现。简单说,它给出“承诺(Promise)”:会在稍后完成工作。而 .NET 的 Task 是为了简化使用“Promise”而设计的 API。

Task 表示不返回值的操作,Task 表示返回 T 类型的值的操作。

重要的是要把 Task 理解为发起异步工作的抽象,而不是对线程的抽象。默认情况下,Task 在当前线程上执行,并酌情将工作委托给操作系统。可以选择通过 Task.Run API 明确要求任务在单独的线程上运行。

Task 提供了一个 API 协议,用于监视、等待和访问任务的结果值。比如,通过 await 关键字等待任务执行完成,为使用 Task 提供了更高层次的抽象。

使用 await 允许你在任务运行期间执行其它有用的工作,将线程的控制权交给其它调用者,直到自己的任务完成。你不再需要依赖回调或事件来在任务完成后继续执行后续工作。

2Task 的状态

虽然实际 TAP 编程中很少使用到 Task 的状态,但它是很多异步操作机理的基础。Task 类为异步操作提供了一个生命周期,这个周期由 TaskStatus 枚举表示,它有如下值:

public enum TaskStatus
{
Created = 0,
WaitingForActivation = 1,
WaitingToRun = 2,
Running = 3,
WaitingForChildrenToComplete = 4,
RanToCompletion = 5,
Canceled = 6,
Faulted = 7
}

其中 CanceledFaultedRanToCompletion 状态一起被认为是任务的最终状态。因此,如果任务处于最终状态,则其 IsCompleted 属性为 true 值。

3I/O 受限异步操作

下面示例代码演示了一个典型的异步 I/O 调用操作:

public Task<string> GetHtmlAsync()
{
// 此处是同步执行
var client = new HttpClient();
return client.GetStringAsync("https://www.dotnetfoundation.org");
}

这个例子调用了一个异步方法,并返回了一个活动的 Task,它很可能还没有完成。

下面第二个代码示例增加了asyncawait关键字对任务进行操作:

public async Task<string> GetFirstCharactersCountAsync(string url, int count)
{
// 此处是同步执行
var client = new HttpClient();

// 此处 await 挂起代码的执行,把控制权交出去(线程可以去做别的事情)
var page = await client.GetStringAsync("https://www.dotnetfoundation.org");

// 任务完成后恢复了控制权,继续执行后续代码
// 此处回到了同步执行

if (count > page.Length)
{
return page;
}
else
{
return page.Substring(0, count);
}
}

使用 await 关键字告诉当前上下文赶紧生成快照并交出控制权,异步任务执行完成后会带着返回值去线程池排队等待可用线程,等到可用线程后,恢复上下文,线程继续执行后续代码。

GetStringAsync() 方法的内部通过底层 .NET 库调用资源(也许会调用其他异步方法),一直到 P/Invoke 互操作调用本地(Native)网络库。本地库随后可能会调用到一个系统 API(如 Linux 上 Socket 的write()API)。Task 对象将通过层层传递,最终返回给初始调用者。

在整个过程中,关键的一点是,没有一个线程是专门用来处理任务的。虽然工作是在某种上下文中执行的(操作系统确实要把数据传递给设备驱动程序并中断响应),但没有线程专门用来等待请求的数据回返回。这使得系统可以处理更大的工作量,而不是干等着某个 I/O 调用完成。

虽然上面的工作看似很多,但与实际 I/O 工作所需的时间相比,简直微不足道。用一条不太精确的时间线来表示,大概是这样的:

0-1--------------------2-3

01所花费的时间是await交出控制权之前所花的时间。从12花费的时间是GetStringAsync方法花费在 I/O 上的时间,没有 CPU 成本。最后,从23花费的时间是上下文重新获取控制权后继续执行的时间。

4CPU 受限异步操作

CPU 受限的异步代码与 I/O 受限的异步代码有些不同。因为工作是在 CPU 上完成的,所以没有办法绕开专门的线程来进行计算。使用 async 和 await 只是为你提供了一种干净的方式来与后台线程进行交互。请注意,这并不能为共享数据提供加锁保护,如果你正在使用共享数据,仍然需要使用适当的同步策略。

下面是一个 CPU 受限的异步调用:

public async Task<int> CalculateResult(InputData data)
{
// 在线程池排队获取线程来处理任务
var expensiveResultTask = Task.Run(() => DoExpensiveCalculation(data));

// 此时此处,你可以并行地处理其它工作

var result = await expensiveResultTask;

return result;
}

CalculateResult方法在它被调用的线程(一般可以定义为主线程)上执行。当它调用Task.Run时,会在线程池上排队执行 CPU 受限操作 DoExpensiveCalculation,并接收一个Task句柄。DoExpensiveCalculation会在下一个可用的线程上并行运行,很可能是在另一个 CPU 核上。和 I/O 受限异步调用一样,一旦遇到awaitCalculateResult的控制权就会被交给它的调用者,这样在DoExpensiveCalculation返回结果的时候,结果就会被安排在主线程上排队运行。

对于开发者,CPU 受限和 I/O 受限的在调用方式上没什么区别。区别在于所调用资源性质的不同,不必关心底层对不同资源的调用的具体逻辑。编写代码需要考虑的是,对于 CPU 受限的异步任务,根据实际情况考虑是否需要使其和其它任务并行执行,以加快程序的整体运行时间。

5异步编程模式

最后简单回顾一下 .NET 历史上提供的三种执行异步操作的模式。

  • 基于任务的异步模式(Task-based Asynchronous Pattern,TAP),它使用单一的方法来表示异步操作的启动和完成。TAP 是在 .NET Framework 4 中引入的。它是 .NET 中异步编程的推荐方法。C# 中的 async 和 await 关键字为 TAP 添加了语言支持。

  • 基于事件的异步模式(Event-based Asynchronous Pattern,EAP),这是基于事件的传统模式,用于提供异步行为。它需要一个具有 Async 后缀的方法和一个或多个事件。EAP 是在 .NET Framework 2.0 中引入的。它不再被推荐用于新的开发。

  • 异步编程模式(Asynchronous Programming Model,APM)模式,也称为 IAsyncResult 模式,这是使用 IAsyncResult 接口提供异步行为的传统模式。在这种模式中,需要BeginEnd方法同步操作(例如,BeginWriteEndWrite来实现异步写操作)。这种模式也不再推荐用于新的开发。

下面简单举例对三种模式进行比较。

假设有一个 Read 方法,该方法从指定的偏移量开始将指定数量的数据读入提供的缓冲区:

public class MyClass
{
public int Read(byte [] buffer, int offset, int count);
}

若用 TAP 异步模式来改写,该方法将是简单的一个 ReadAsync 方法:

public class MyClass
{
public Task<int> ReadAsync(byte [] buffer, int offset, int count);
}

若使用 EAP 异步模式,需要额外多定义一些类型和成员:

public class MyClass
{
public void ReadAsync(byte [] buffer, int offset, int count);
public event ReadCompletedEventHandler ReadCompleted;
}

public delegate void ReadCompletedEventHandler(
object sender, ReadCompletedEventArgs e);

public class ReadCompletedEventArgs : AsyncCompletedEventArgs
{
public MyReturnType Result { get; }
}

若使用 AMP 异步模式,则需要定义两个方法,一个用于开始执行异步操作,一个用于接收异步操作结果:

public class MyClass
{
public IAsyncResult BeginRead(
byte [] buffer, int offset, int count,
AsyncCallback callback, object state);
public int EndRead(IAsyncResult asyncResult);
}

后两种异步模式已经过时不推荐使用了,这里也不再继续探讨。年长的 .NET 程序员可能比较熟悉后两种异步模式,毕竟那时候没有 async/await,应该没少折腾。

下面来介绍几个常见的基于 TAP 的异步操作。

6手动控制任务启动

为了支持手动控制任务启动,并支持构造与调用的分离,Task 类提供了一个 Start 方法。由 Task 构造函数创建的任务被称为冷任务,因为它们的生命周期处于 Created 状态,只有该实例的 Start 方法被调用才会启动。

任务状态平时用的情况不多,一般我们在封装一个任务相关的方法时,可能会用到。比如下面这个例子,需要判断某任务满足一定条件才启动:

static void Main(string[] args)
{
MyTask t = new(() =>
{
// do something.
});

StartMyTask(t);

Console.ReadKey();
}

public static void StartMyTask(MyTask t)
{
if (t.Status == TaskStatus.Created && t.Counter>10)
{
t.Start();
}
else
{
// 这里模拟计数业务代码,直到 Counter>10 再执行 Start
while (t.Counter <= 10)
{
// Do something
t.Counter++;
}
t.Start();
}
}

public class MyTask : Task
{
public MyTask(Action action) : base(action)
{
}

public int Counter { get; set; }
}

同样,TaskStatus.Created 状态以外的状态,我们叫它热任务,热任务一定是被调用了 Start 方法激活过的。

7确保任务已激活

注意,所有从 TAP 方法返回的任务都必须被激活,比如下面这样的代码:

MyTask task = new(() =>
{
Console.WriteLine("Do something.");
});

// 在其它地方调用
await task;

await 之前,任务没有执行 Task.Start 激活,await 时程序就会一直等待下去。所以如果一个 TAP 方法内部使用 Task 构造函数来实例化要返回的 Task,那么 TAP 方法必须在返回 Task 对象之前对其调用 Start

8任务取消

在 TAP 中,取消对于异步方法实现者和消费者来说都是可选的。如果一个操作允许取消,它就会暴露一个异步方法的重载,该方法接受一个取消令牌(CancellationToken 实例)。按照惯例,参数被命名为 cancellationToken。例如:

public Task ReadAsync(
byte [] buffer, int offset, int count,
CancellationToken cancellationToken)

异步操作会监控这个令牌是否有取消请求。如果收到取消请求,它可以选择取消操作,如下面的示例通过 while 来监控令牌的取消请求:

static void Main(string[] args)
{
CancellationTokenSource source = new();
CancellationToken token = source.Token;

var task = DoWork(token);

// 实际情况可能是在稍后的其它线程请求取消
Thread.Sleep(100);
source.Cancel();

Console.WriteLine($"取消后任务返回的状态:{task.Status}");

Console.ReadKey();
}

public static Task DoWork(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
// Do something.
Thread.Sleep(1000);

return Task.CompletedTask;
}
return Task.FromCanceled(cancellationToken);
}

如果取消请求导致工作提前结束,甚至还没有开始就收到请求取消,则 TAP 方法返回一个以 Canceled 状态结束的任务,它的 IsCompleted 属性为 true,且不会抛出异常。当任务在 Canceled 状态下完成时,任何在该任务注册的延续任务仍都会被调用和执行,除非指定了诸如 NotOnCanceled 这样的选项来选择不延续。

但是,如果在异步任务在工作时收到取消请求,异步操作也可以选择不立刻结束,而是等当前正在执行的工作完成后再结束,并返回 RanToCompletion 状态的任务;也可以终止当前工作并强制结束,根据实际业务情况和是否生产异常结果返回 CanceledFaulted 状态。

对于不能被取消的业务方法,不要提供接受取消令牌的重载,这有助于向调用者表明目标方法是否可以取消。

9进度报告

几乎所有异步操作都可以提供进度通知,这些通知通常用于用异步操作的进度信息更新用户界面。

在 TAP 中,进度是通过 IProgress 接口来处理的,该接口作为一个参数传递给异步方法。下面是一个典型的的使用示例:

static void Main(string[] args)
{
var progress = new Progress<int>(n =>
{
Console.WriteLine($"当前进度:{n}%");
});

var task = DoWork(progress);

Console.ReadKey();
}

public static async Task DoWork(IProgress<int> progress)
{
for (int i = 1; i <= 100; i++)
{
await Task.Delay(100);
if (i % 10 == 0)
{
progress?.Report(i);
};
}
}

输出如下结果:

当前进度:10%
当前进度:20%
当前进度:30%
当前进度:40%
当前进度:50%
当前进度:60%
当前进度:70%
当前进度:80%
当前进度:90%
当前进度:100%

IProgress 接口支持不同的进度实现,这是由消费代码决定的。例如,消费代码可能只关心最新的进度更新,或者希望缓冲所有更新,或者希望为每个更新调用一个操作,等等。所有这些选项都可以通过使用该接口来实现,并根据特定消费者的需求进行定制。例如,如果本文前面的 ReadAsync 方法能够以当前读取的字节数的形式报告进度,那么进度回调可以是一个 IProgress 接口。

public Task ReadAsync(
byte[] buffer, int offset, int count,
IProgress<long> progress)

再如 FindFilesAsync 方法返回符合特定搜索模式的所有文件列表,进度回调可以提供工作完成的百分比和当前部分结果集,它可以用一个元组来提供这个信息。

public Task> FindFilesAsync(
string pattern,
IProgressdouble, ReadOnlyCollection>>> progress)

或使用 API 特有的数据类型:

public Task> FindFilesAsync(
string pattern,
IProgress progress)

如果 TAP 的实现提供了接受 IProgress 参数的重载,它们必须允许参数为空,在这种情况下,不会报告进度。IProgress 实例可以作为独立的对象,允许调用者决定如何以及在哪里处理这些进度信息。

10Task.Yield 让步

我们先来看一段 Task.Yield() 的代码:

Task.Run(async () =>
{
for(int i=0; i<10; i++)
{
await Task.Yield();
...
}
});

这里的 Task.Yield() 其实什么也没干,它返回的是一个空任务。那 await 一个什么也没做的空任务有什么用呢?

我们知道,对计算机来说,任务调度是根据一定的优先策略来安排线程去执行的。如果任务太多,线程不够用,任务就会进入排队状态。而 Yield 的作用就是让出等待的位置,让后面排除的任务先行。它字面上的意思就是让步,当任务做出让步时,其它任务就可以尽快被分配线程去执行。举个现实生活中的例子,就像你在排队办理业务时,好不容易到你了,但你的事情并不急,自愿让出位置,让其他人先办理,自己假装临时有事到外面溜一圈什么事也没干又回来重新排队。默默地做了一次大善人。

Task.Yield() 方法就是在异步方法中引入一个让步点。当代码执行到让步点时,就会让出控制权,去线程池外面兜一圈什么事也没干再回来重新排队。

11定制异步任务后续操作

我们可以对异步任务执行完成的后续操作进行定制。常见的两个方法是 ConfigureAwaitContinueWith

ConfigureAwait

我们先来看一段 Windows Form 中的代码:

private void button1_Click(object sender, EventArgs e)
{
var content = CurlAsync().Result;
...
}

private async Task<string> CurlAsync()
{
using (var client = new HttpClient())
{
return await client.GetStringAsync("http://geekgist.com");
}
}

想必大家都知道 CurlAsync().Result 这句代码在 Windows Form 程序中会造成死锁。原因是 UI 主线程执行到这句代码时,就开始等待异步任务的结果,处于阻塞状态。而异步任务执行完后回来准备找 UI 线程继续执行后面的代码时,却发现 UI 线程一直处于“忙碌”的状态,没空搭理回来的异步任务。这就造成了你等我,我又在等你的尴尬局面。

当然,这种死锁的情况只会在 Winform 和早期的 ASP.NET WebForm 中才会发生,在 Console 和 Web API 应用中不会生产死锁。

解决办法很简单,作为异步方法调用者,我们只需改用 await 即可:

private async void button1_Click(object sender, EventArgs e)
{
var content = await CurlAsync();
...
}

在异步方法内部,我们也可以调用任务的 ConfigureAwait(false) 方法来解决这个问题。如:

private async Task<string> CurlAsync()
{
using (var client = new HttpClient())
{
return await client
.GetStringAsync("http://geekgist.com")
.ConfigureAwait(false);
}
}

虽然两种方法都可行,但如果作为异步方法提供者,比如封装一个通用库时,考虑到难免会有新手开发者会使用 CurlAsync().Result,为了提高通用库的容错性,我们就可能需要使用 ConfigureAwait 来做兼容。

ConfigureAwait(false) 的作用是告诉主线程,我要去远行了,你去做其它事情吧,不用等我。只要先确保一方不在一直等另一方,就能避免互相等待而造成死锁的情况。

ContinueWith

ContinueWith 方法很容易理解,就是字面上的意思。作用是在异步任务执行完成后,安排后续要执行的工作。示例代码:

private void Button1_Click(object sender, EventArgs e)
{
var backgroundScheduler = TaskScheduler.Default;
var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
Task.Factory
.StartNew(_ => DoBackgroundComputation(), backgroundScheduler)
.ContinueWith(_ => UpdateUI(), uiScheduler)
.ContinueWith(_ => DoAnotherBackgroundComputation(), backgroundScheduler)
.ContinueWith(_ => UpdateUIAgain(), uiScheduler);
}

如上,可以一直链式的写下去,任务会按照顺序执行,一个执行完再继续执行下一个。若其中一个任务返回的状态是 Canceled 时,后续的任务也将被取消。这个方法有好些个重载,在实际用到的时候再查看文档即可。

12小结

System.Threading.Tasks 命名空间中关键的一个类是 Task 类,基于 Task 的异步 API 和语言级异步编程模式颠覆了传统模式,使得异步编程非常简单。它使我们可以只关注业务层面要处理的任务,而不必关心和使用线程或线程池。重要的是要把 Task 理解为发起异步工作的抽象,而不是对线程的抽象。本文还介绍了 .NET 异步编程模式,而我们现在主流用的都是 TAP 模式,最后本文罗列一些常见的异步操作。

浏览 22
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报