如何运用并行编程Parallel提升任务执行效率

共 8209字,需浏览 17分钟

 ·

2021-06-09 12:18

本文来自小易,【DoTNET技术圈】公众号已获得转载授权。


《.NET并发变成实战》读后感:并行编程Parallel

手打目录:

一、前言

二、任务并行库(TPL)的介绍

三、Parallel.Invoke的使用

四、Parallel.For的使用

五、Parallel.ForEach+Partitioner的使用

六、指定最大并行度MaxDegreeOfParallelism

七、退出循环以及捕捉异常

八、参考的资料

一、前言

背景:在物联网场景下,由于数据吞吐量较大,常规的Task异步执行存在明显的性能瓶颈,后通过参考Riccardo Terrel(里卡尔多Dian·特雷尔)著,叶伟民老师翻译的《.NET并发编程实战》,使用了Parallel并行编程,以及分区器Partitioner,将两者结合使用提高了设备数据绑定及数据更新速度,也做到了对CPU的性能比较极致使用。

萌新记录,大佬多加斧正!

可跳过概念,直接抵达使用实例>五、Parallel.ForEach+Partitioner的使用

并行编程的原理

在《.net并发编程实战》(以下称《实战》)中这样解释并行编程——同时执行多个任务。

从开发人员的角度看,当我们考虑这些问题是,“我的程序可以同时执行多项操作吗?”或“我的程序如何更快地解决一个问题”我们会想到并行。并行是指同时在不同的内核上执行多个任务,以提高应用程序的速度,这需要硬件支持(多核),且并行只能在多核设备中实现,是提高程序性能和吞吐量的手段。

并行与并发编程简单区分

1、 并发编程一次处理多个操作,不需要硬件支持(使用一个或多个内核)。

2、 并行编程在多个CPU或多个内核上同时执行多个操作。所有并行程序都是并发的,同时运行的,但并非所有并发都是并行的。原因是并行只能在多核设备上实现。

3、 多任务同时执行来自不同进程的多个线程。多任务并不一定意味着并行执行,只有在使用多个CPU 或多个内核时才能实现并行执行。

 

为什么需要使用并行编程

实战不同程序CPU使用资源使用的程度做了一个对比: 

 

《实战》中认为,在一台多核计算机上运行一个没有考虑到并发的应用程序,就是在浪费计算机的生产力,因为应用程序在顺序处理过程中只能使用一部分可用的计算能力,在这种情况下任何CPU性能计数器会发现只有一个内核运行得很快,可能为100%,而其他内核未充分利用或空闲,在上图的8内核的计算机中,运行的非并行程序意味着资源的总体使用率可能不到15%。

使用并行编程的两种方式

1、 任务并行库(TPL),本文中只使用了这种方式

2、 并行LINQ(PLINQ)—》官方文档直达:https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/introduction-to-plinq

二、任务并行库(TPL)的并行介绍

.Net Framework4 引入了新的Task Parallel Library(任务并行库,TPL),它支持数据并行、任务并行和流水线。

当并行循环运行时,TPL会将数据源按照内置的分区算法(或者你可以自定义一个分区算法)将数据划分为多个不相交的子集,然后,从线程池中选择线程并行地处理这些数据子集,每个线程只负责处理一个数据子集。在后台,任务计划程序将根据系统资源和工作负荷来对任务进行分区。如有可能,计划程序会在工作负荷变得不平衡的情况下在多个线程和处理器之间重新分配工作。

在对任何代码(包括循环)进行并行化时,一个重要的目标是利用尽可能多的处理器,但不要过度并行化到使行处理的开销让任何性能优势消耗殆尽的程度。比如:对于嵌套循环,只会对外部循环进行并行化,原因是不会在内部循环中执行太多工作。少量工作和不良缓存影响的组合可能会导致嵌套并行循环的性能降低。

由于循环体是并行运行的,迭代范围的分区是根据可用的逻辑内核数、分区大小以及其他因素动态变化的,因此无法保证迭代的执行顺序。

    TPL引入了System.Threading.Tasks ,主类是Task,这个类表示一个异步的并发的操作,然而我们不一定要使用Task类的实例,可以使用Parallel静态类。它提供了Parallel.Invoke Parallel.ForParallel.Forecah 三个方法以下分别介绍3个方法的简单实例,每个方法都有多个重载,可自行查看源代码

三、Parallel.Invoke的使用

        

static void Main()        {            try            {                Parallel.Invoke(                    BasicAction,// Param #0 - 静态方法                    () =>// Param #1 - lambda表达式                    {                        Console.WriteLine("干饭人干饭, Thread={0}", Thread.CurrentThread.ManagedThreadId);                    },                    delegate ()// Param #2 -  委托                    {                        Console.WriteLine("委托方法中, Thread={0}", Thread.CurrentThread.ManagedThreadId);                    }                );            }            // 在本例中不期望出现异常,但如果任务中仍然抛出异常,            // 它将被包装在AggregateException中,并传播到主线程。            catch (AggregateException e)            {                Console.WriteLine("捕捉异常 \n{0}", e.InnerException.ToString());            }        }         static void BasicAction()        {            Console.WriteLine("打工人打工, Thread={0}", Thread.CurrentThread.ManagedThreadId);        } 


 

 

注解:

此方法可用于执行可能并行执行的一组操作。

不保证执行操作的顺序,或是否并行执行操作。

此方法在每个提供的操作都已完成后才会返回,无论是由于正常终止还是异常终止而发生。

 

四、Parallel.For的使用

我们先用一个简单的插入,来比较并行的for循环与串行for循环的速度

 


这里因为Parallel.For在对处理器分配任务时候也有性能消耗,速度提升并不明显。

 

接下来我们看一下Parallel.For的其中重载之一

 

          

var list = new List<int>() { 10203040 };var options = new ParallelOptions();var total = 0;var result = Parallel.For(0, list.Count, () =>  {                Console.WriteLine("------------  thead --------------");                 return 1;    },              (i, loop, j) =>              {                  Console.WriteLine("------------  body --------------");                   Console.WriteLine("i=" + list[i] + " j=" + j);                   return list[i];              },     (b) =>     {     Console.WriteLine("------------  tfoot --------------");      Interlocked.Add(ref total, b);      Console.WriteLine("total=" + total);}); Console.WriteLine("iscompleted:" + result.IsCompleted);Console.Read();


注解:

因为并行任务当中不保证执行顺序,且多任务可能会同时尝试更新total变量,所以这里使用了 Interlocked.Add执行,来保证它是作为原子操作来执行

五、Parallel.ForEach+Partitioner的结合使用

Partitioner分区器:

首先我们来看看分区器源代码看他是如何对数据源进行分区的

 


Partitioner.Create 若只指定的数据源的起始于结束的索引位置,创建分区则主要是根据逻辑内核数PlatformHelper.ProcessorCount决定的

 

 


 

大部分情况下,TPL在幕后使用的负载均衡机制都是非常高效的,比如我们不使用分区器,直接对数据源进行负载均衡的并行执行,案例请看—>六、指定最大并行度。

当然我们可以自定义分区大小以下我们进入到实际的开发环境中,当前实验电脑为6核12线程处理器

注解:

dataList —>实时数据的数据源

Index —>数据源总数,此处假设1W条数据

rangesize—>区块大小,由此可以计算  10000/12+1=834(+1是为了适应可能除不尽的情况)

Partitioner.Create(0,Index,rangesize) —>分区器将数据源0-1W条数据分成了12个数据块,每一块为834条,当然最后一块没有834条数据

 

 


打上断点可以看到range.Item2-range.Item1=834,已经分好区块了,然后就是并行处理业务代码了。

这里贴上示例,粘贴可用:

            

int index = 10000;var rangesize = (int)(index / Environment.ProcessorCount) + 1;var rangePartitioner = Partitioner.Create(1, index, rangesize);System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range => {                #region 业务代码                 #endregion});


六、指定最大并行度MaxDegreeOfParallelism

参考博客文章:Parallel.ForEach 之 MaxDegreeOfParallelism

https://www.cnblogs.com/QinQouShui/p/12134232.html

 System.Threading.Tasks.Parallel.ForEach(list, new ParallelOptions() { MaxDegreeOfParallelism = 12 }, range => {                #region 业务代码                #endregion });


Parallel.ForEach并没有使用分区器,而是用TPL进行负载均衡的并行。

该重载的源代码为:

 


七、退出循环以及捕捉异常

和串行运行中的break不同,ParallelLoopState 提供了两个方法用于停止Parallel.For 和 Parallel.ForEach的执行。

 

public class ParallelLoopState{    // 获取循环的任何迭代是否已引发相应迭代未处理的异常。    public bool IsExceptional { get; }    // 获取循环的任何迭代是否已调用 ParallelLoopState.Stop()。    public bool IsStopped { get; }    // 获取在Parallel循环中调用 ParallelLoopState.Break() 的最低循环迭代。    public long? LowestBreakIteration { get; }    // 获取循环的当前迭代是否应基于此迭代或其他迭代发出的请求退出。    public bool ShouldExitCurrentIteration { get; }     //通知Parallel循环当前迭代”之后”的其他迭代不需要运行。    public void Break();    //通知Parallel循环当前迭代“之外”的所有其他迭代不需要运行。    public void Stop();}


 

Break:用于通知Parallel循环当前迭代“之后”的其他迭代不需要运行。例如,对于从 0 到 1000 并行迭代的 for 循环,如果在第 100 次迭代调用 Break(),则低于 100 的所有迭代仍会运行(即使还未开始处理),并在退出循环之前处理完。从 101 到 1000 中还未开启的迭代则会被放弃。对于已经在执行的长时间运行迭代,Break()将为已运行还未结束的迭代对应ParallelLoopResult结构的LowestBreakIteration属性设置为调用Bread()迭代项的索引。

Stop:Stop() 用于通知Parallel循环当前迭代“之外”的所有其他迭代不需要运行,无论它们是位于当前迭代的上方还是下方。对于已经在执行的长时间运行迭代,可以检查 IsStopped属性,在观测到是 true 时提前退出。Stop 通常在基于搜索的算法中使用,在找到一个结果之后就不需要执行其他任何迭代。(比如在看视频或漫画时自动匹配响应最快的服务器)

var loopresult = System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{                #region 业务代码    loopState.Stop();                #endregion});


 

 

 

 当并行迭代中调用的委托抛出异常,这个异常没有在委托中被捕获到时,就会变成一组异常,新的System.AggregateException负责处理这一组异常。

try{    System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>    {                #region 业务代码                 #endregion    });}Catch(AggregateException ex){    foreach (var innerEx in  ex.InnerExceptions)    {       Console.WriteLine(innerEx.ToString());    }}


八、参考的资料

《.net并发编程实战》

官方文档《.NET 中的并行编程》https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/

博客园《.Net并行编程高级教程--Parallel》https://www.cnblogs.com/stoneniqiu/p/4857021.html

博客园《8天玩转并发》

https://www.cnblogs.com/huangxincheng/category/368987.html

《异步编程:.NET4.X 数据并行》

https://www.cnblogs.com/heyuquan/archive/2013/03/13/parallel-for-foreach-invoke.html

博客园《Parallel.ForEach 之 MaxDegreeOfParallelism》

https://www.cnblogs.com/QinQouShui/p/12134232.html

 end



往期精彩回顾




【推荐】.NET Core开发实战视频课程 ★★★

.NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划

【.NET Core微服务实战-统一身份认证】开篇及目录索引

Redis基本使用及百亿数据量中的使用技巧分享(附视频地址及观看指南)

.NET Core中的一个接口多种实现的依赖注入与动态选择看这篇就够了

10个小技巧助您写出高性能的ASP.NET Core代码

用abp vNext快速开发Quartz.NET定时任务管理界面

在ASP.NET Core中创建基于Quartz.NET托管服务轻松实现作业调度

现身说法:实际业务出发分析百亿数据量下的多表查询优化

关于C#异步编程你应该了解的几点建议

C#异步编程看这篇就够了


浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报