c# Parallel类的使用

作者:一只独行的猿 时间:2022-09-09 12:30:21 

Parallel类是对线程的抽象,提供数据与任务的并行性。类定义了静态方法For和ForEach,使用多个任务来完成多个作业。Parallel.For和Parallel.ForEach方法在每次迭代的时候调用相同的代码,而Parallel.Invoke()方法允许同时调用不同的方法。Parallel.ForEach()方法用于数据的并行性,Parallel.Invoke()方法用于任务的并行性。

1、For()方法

For()方法用于多次执行一个任务,可以并行运行迭代,但迭代的顺序并没指定。For()方法前两个参数为定义循环的开始和结束,第三个参数为Action<int>委托。方法的返回值是ParallelLoopResult结构,它提供了是否结束的信息。如以下循环方法,不能保证输出顺序: 


static void ParallelFor()
{
 ParallelLoopResult result =
   Parallel.For(0, 10, async i =>
     {
       Console.WriteLine("{0}, task: {1}, thread: {2}", i,
         Task.CurrentId, Thread.CurrentThread.ManagedThreadId);

await Task.Delay(10);//异步方法,用于释放线程供其他任务使用。完成后,可能看不到方法的输出,因为主(前台线)程结束,所有的后台线程也将结束
       Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
     });
 Console.WriteLine("Is completed: {0}", result.IsCompleted);
}

异步功能虽然方便,但是知道后台发生了什么仍然重要,必须留意。

提前停止For()方法

可以根据条件提前停止For()方法,而不必完成全部的迭代。,传入参数ParallelLoopState的对象,调用Break()方法或者Stop()方法。如调用Break()方法,当迭代值大于15的时候中断(当前线程结束,类似于普通for的Continue),但其他任务可以同时运行,有其他值的任务也可以运行(如果当前线程是主线程,那么就等同于Stop(),结束所有线程)。Stop()方法结束的是所有操作(类似于普通for的Break)。利用LowestBreakIteration属性可以忽略其他任务的结果:


static void ParallelFor()
{
 ParallelLoopResult result = Parallel.For(10, 40, (int i, ParallelLoopState pls) =>
    {
      Console.WriteLine("i: {0} task {1}", i, Task.CurrentId);
      Thread.Sleep(10);
      if (i > 15)
        pls.Break();
    });
 Console.WriteLine("Is completed: {0}", result.IsCompleted);
 if (!result.IsCompleted)
   Console.WriteLine("lowest break iteration: {0}", result.LowestBreakIteration);
}

For()方法可以使用几个线程执行循环。如果要对每个线程进行初始化,就需要使用到For<TLocal>(int, int, Func<TLocal>, Func<int, ParallelLoopState, TLocal, TLocal> , Action<TLocal>)方法。

  • 前两个参数是对应的循环起始和终止条件;

  • 第二个参数类型是Func<TLocal>,返回一个值,传递给第三个参数。

  • 第三个参数类型是Func<int, ParallelLoopState, TLocal, TLocal>,是循环体的委托,其内部的第一个参数是循环迭代,内部第二个参数允许停止迭代,内部第三个参数用于接收For()方法的前一个参数的返回值。循环体应当返回与For()循环泛型类型一致的值。

  • 第四个参数是指定的一个委托,用于执行相关后续操作。


static void ParallelFor()
{
 Parallel.For<string>(0, 20, () =>
  {
    // invoked once for each thread
    Console.WriteLine("init thread {0}, task {1}", Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
    return String.Format("t{0}", Thread.CurrentThread.ManagedThreadId);
  },
  (i, pls, str1) =>
  {
    // invoked for each member
    Console.WriteLine("body i {0} str1 {1} thread {2} task {3}", i, str1, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
    Thread.Sleep(10);
    return String.Format("i {0}", i);
  },
  (str1) =>
  {
    // final action on each thread
    Console.WriteLine("finally {0}", str1);
  });
}

2、使用ForEach()方法循环

ForEach()方法遍历实现了IEnumerable的集合,其方式类似于foreach语句,但是以异步方式遍历,没有确定的顺序。如果要中断循环,同样可以采用ParallelLoopState参数。ForEach<TSource>有许多泛型的重载方法。


static void ParallelForeach()
{
 string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve" };

ParallelLoopResult result = Parallel.ForEach<string>(data, s =>
      {
        Console.WriteLine(s);
      });
 Parallel.ForEach<string>(data, (s, pls, l) =>
 {
   Console.WriteLine("{0} {1}", s, l);
 });
}

3、调用多个方法
如果有多个任务并行,可以使用Parallel.Invoke()方法,它提供任务的并行性模式:


static void ParallelInvoke()
{
 Parallel.Invoke(Foo, Bar);
}

static void Foo()
{
 Console.WriteLine("foo");
}

static void Bar()
{
 Console.WriteLine("bar");
}

4、For()方法的取消

在For()方法的重载方法中,可以传递一个ParallelOptions类型的参数,利用此参数可以传递一个CancellationToken参数。使用CancellationTokenSource对象用于注册CancellationToken,并允许调用Cancel方法用于取消操作。

一旦取消操作,For()方法就抛出一个OperationCanceledException类型的异常,使用CancellationToken可以注册取消操作时的信息。调用Register方法,传递一个在取消操作时调用的委托。通过取消操作,可以将其他的迭代操作在启动之前取消,但已经启动的迭代操作允许完成。取消操作是以协作方式进行的,以避免在取消迭代操作的中间泄露资源。


static void CancelParallelLoop()
{
 var cts = new CancellationTokenSource();
 cts.Token.ThrowIfCancellationRequested();
 cts.Token.Register(() => Console.WriteLine("** token cancelled"));
 // 在500ms后取消标记
 cts.CancelAfter(500);
 try
 {
   ParallelLoopResult result = Parallel.For(0, 100,
     new ParallelOptions()
     {
       CancellationToken = cts.Token
     },
       x =>
       {
         Console.WriteLine("loop {0} started", x);
         int sum = 0;
         for (int i = 0; i < 100; i++)
         {
           Thread.Sleep(2);
           sum += i;
         }
         Console.WriteLine("loop {0} finished", x);
       });
 }
 catch (OperationCanceledException ex)
 {
   Console.WriteLine(ex.Message);
 }
}

5、发现存在的问题

使用并行循环时,若出现以下两个问题,需要使用Partitioner(命名空间 System.Collections.Concurrent中)解决。

  1. 使用并行循环时,应确保每次迭代的工作量要明显大于同步共享状态的开销。 如果循环把时间都耗在了阻塞式的访问共享的循环变量上,那么并行执行的好处就很容易完全丧失。尽可能让每次循环迭代都只是在局部进行,避免阻塞式访问造成的损耗。见示例1

  2. 并行循环的每一次迭代都会生成一个委托,如果每次生成委托或方法的开销比迭代完成的工作量大,使用并行方案就不适合了(委托会设计两类开销:构造开销和调用开销。大多数调用开销和普通方法的调用差不多。 但委托是一种对象,构造开销可能相当大,最好是只做一次构造,然后把对象缓存起来)。见示例2

示例1中,求1000000000以内所有自然数开方的和。第一部分采用直接计算的方式,第二部分采用分区计算。第二部分的Partitioner 会把需要迭代的区间分拆为多个不同的空间,并存入Tuple对象中。


/*   示例1  */public static void PartitionerTest()
{
 //使用计时器
 System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch();
 const int maxValue = 1000000000;
 long sum = 0;
 stopwatch.Restart();//开始计时
 Parallel.For(0, maxValue, (i) => {
   Interlocked.Add(ref sum, (long )Math.Sqrt(i));//Interlocked是原子操作,多线程访问时的线程互斥操作
 });
 stopwatch.Stop();
 Console.WriteLine($"Parallel.For:{stopwatch.Elapsed}");//我的机器运行出的时间是:00:01:37.0391204

var partitioner = System.Collections.Concurrent.Partitioner.Create(0, maxValue);//拆分区间
 sum = 0;
 stopwatch.Restart();
 Parallel.ForEach(partitioner, (rang) => {
   long partialSum = 0;
   //迭代区间的数据
   for(int i=rang.Item1;i<rang.Item2;i++)
   {
     partialSum += (long)Math.Sqrt(i);
   }
   Interlocked.Add(ref sum, partialSum);//原子操作
 });
 stopwatch.Stop();
 Console.WriteLine($"Parallel.ForEach:{stopwatch.Elapsed}"); //我的机器运行出的时间是:00:00:02.7111666
}

Partitioner的分区是静态的,只要迭代分区划分完成,每个分区上都会运行一个委托。如果某一段区间的迭代次数提前完成,也不会尝试重新分区并让处理器分担工作。 对于任意IEnumerable<T>类型都可以创建不指定区间的分区,但这样就会让每个迭代项目都创建一个委托,而不是对每个区间创建委托。创建自定义的Partitioner可以解决这个问题,代码比较复杂。请自行参阅:http://www.writinghighperf.net/go/20

示例2中,采用一个委托方法来计算两个数之间的关系值。前一种是每次运行都重新构造委托,后一种是先构造出委托的方法而后每一次调用。


//声明一个委托
private delegate int MathOp(int x, int y);
private int Add(int x,int y)
{
  return x + y;
}

private int DoOperation(MathOp op,int x,int y)
{
  return op(x, y);
}

/*
* 委托会设计两类开销:构造开销和调用开销。大多数调用开销和普通方法的调用差不多。 但委托是一种对象,构造开销可能相当大,最好是只做一次构造,然后把对象缓存起来。
*/
public void Test()
{
  System.Diagnostics.Stopwatch stopwatch = new System.Diagnostics.Stopwatch();
  stopwatch.Restart();
  for(int i=0;i<10;i++)
  {
    //每一次遍历循环,都会产生一次构造和调用开销
    DoOperation(Add, 1, 2);
  }
  stopwatch.Stop();
  Console.WriteLine("Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0003812

stopwatch.Restart();
  MathOp op = Add;//只产生一次构造开销
  for(int i=0;i<10;i++)
  {
    DoOperation(op, 1, 2);//每一次遍历都只产生遍历开销
  }
  stopwatch.Stop();
  Console.WriteLine("Once Construction and invocation: {0}", stopwatch.Elapsed);//00:00:00.0000011
}

来源:https://www.cnblogs.com/pilgrim/p/9356972.html

标签:c#,Parallel
0
投稿

猜你喜欢

  • Mybatis返回插入的主键问题解决方案

    2023-05-06 02:58:03
  • Java 实战练习之网上电商项目的实现

    2021-07-17 04:23:59
  • Java集合去重导致的线上问题

    2022-01-24 04:52:29
  • c# 基于wpf,开发OFD电子文档阅读器

    2023-09-08 00:10:02
  • 使用spring通过aop获取方法参数和参数值

    2022-06-04 16:28:33
  • jstl标签基础开发步骤(详解)

    2023-07-08 18:25:41
  • Java 8 中的 10 个特性总结及详解

    2023-07-21 00:06:51
  • 剑指Offer之Java算法习题精讲数组与字符串

    2021-05-24 19:21:45
  • java读写oracle的blob字段示例

    2023-12-22 16:19:00
  • 教大家使用java实现顶一下踩一下功能

    2021-08-08 21:31:15
  • Spring Boot2如何构建可部署的war包

    2023-11-29 06:40:59
  • JAVA内存模型(JMM)详解

    2023-11-23 16:54:32
  • opencv 做人脸识别 opencv 人脸匹配分析

    2023-07-09 06:34:44
  • Java单例模式的8种写法(推荐)

    2023-01-06 14:23:27
  • IntelliJ IDEA 2020.2正式发布,两点多多总能助你提效

    2023-08-30 18:15:18
  • Java类库BeanUtils组件使用方法及实例详解

    2022-09-28 00:37:49
  • pagehelper插件显示total为-1或1的问题

    2021-11-04 01:02:39
  • C#SuperSocket的搭建并配置启动总结

    2022-01-25 15:16:24
  • 一文带你搞懂Java定时器Timer的使用

    2022-09-08 01:18:16
  • 基于C#调用c++Dll结构体数组指针的问题详解

    2021-12-10 23:16:41
  • asp之家 软件编程 m.aspxhome.com