Spark调优多线程并行处理任务实现方式

作者:lshan 时间:2023-08-21 15:43:53 

方式1:

1. 明确 Spark中Job 与 Streaming中 Job 的区别

1.1 Spark Core

一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)

一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算

Job在spark里应用里是一个被调度的单位

1.2 Streaming

一个 batch 的数据对应一个 DStreamGraph

而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作

每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job

2. Streaming Job的并行度

Job的并行度由两个配置决定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job

所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念

这些 Job 由 jobExecutor 依次提交执行

而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job

这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 决定了向 Spark Core 提交Job的并行度

提交一个Job,必须等这个执行完了,才会提交第二个

假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core

Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)

默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job

虽然如此,如果资源够两个job运行,还是会并行运行两个Job

Spark Streaming 不同Batch任务可以并行计算么 https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行对
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))

你会发现,不同batch的job其实也可以并行运行的,这里需要有几个条件:

有延时发生了,batch无法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO则需要某个Job无法一直消耗掉所有资源

Mode是FAIR则尽力保证你的Job是并行运行的,毫无疑问是可以并行的。

方式2:

场景1:

程序每次处理的数据量是波动的,比如周末比工作日多很多,晚八点比凌晨四点多很多。

一个spark程序处理的时间在1-2小时波动是OK的。而spark streaming程序不可以,如果每次处理的时间是1-10分钟,就很蛋疼。
设置10分钟吧,实际上10分钟的也就那一段高峰时间,如果设置每次是1分钟,很多时候会出现程序处理不过来,排队过多的任务延迟更久,还可能出现程序崩溃的可能。

场景2:

  • 程序需要处理的相似job数随着业务的增长越来越多

  • 我们知道spark的api里无相互依赖的stage是并行处理的,但是job之间是串行处理的。

  • spark程序通常是离线处理,比如T+1之类的延迟,时间变长是可以容忍的。而spark streaming是准实时的,如果业务增长导致延迟增加就很不合理。

spark虽然是串行执行job,但是是可以把job放到线程池里多线程执行的。如何在一个SparkContext中提交多个任务


DStream.foreachRDD{
  rdd =>
   //创建线程池
   val executors=Executors.newFixedThreadPool(rules.length)
   //将规则放入线程池
   for( ru <- rules){
    val task= executors.submit(new Callable[String] {
     override def call(): String ={
      //执行规则
      runRule(ru,spark)
     }
    })
   }
   //每次创建的线程池执行完所有规则后shutdown
   executors.shutdown()
 }

注意点

1.最后需要executors.shutdown()。

  • 如果是executors.shutdownNow()会发生未执行完的task强制关闭线程。

  • 如果使用executors.awaitTermination()则会发生阻塞,不是我们想要的结果。

  • 如果没有这个shutdowm操作,程序会正常执行,但是长时间会产生大量无用的线程池,因为每次foreachRDD都会创建一个线程池。

2.可不可以将创建线程池放到foreachRDD外面?

不可以,这个关系到对于scala闭包到理解,经测试,第一次或者前几次batch是正常的,后面的batch无线程可用。

3.线程池executor崩溃了就会导致数据丢失

原则上是这样的,但是正常的代码一般不会发生executor崩溃。至少我在使用的时候没遇到过。

来源:https://www.cnblogs.com/lshan/p/13356037.html

标签:Spark,调优,多,线程,并行
0
投稿

猜你喜欢

  • Unity中的RegisterPlugins实用案例深入解析

    2022-04-02 10:14:41
  • Java实现简易俄罗斯方块

    2022-12-18 14:07:58
  • Android实现中国象棋附源码下载

    2023-12-20 17:09:10
  • Android获取手机文件夹及文件列表的方法

    2022-04-22 22:22:32
  • C#实现简单记事本程序

    2022-07-20 23:52:37
  • C#实现图形区域组合操作的方法

    2023-05-01 19:08:21
  • newtonsoft.json解析天气数据出错解决方法

    2022-03-10 12:23:21
  • android10 隐藏SystemUI锁屏下的多用户图标的示例代码

    2023-12-12 03:14:26
  • C# 图片格式转换的实例代码

    2023-03-11 13:32:46
  • WPF+DiffPlex实现文本比对工具

    2022-04-20 21:32:07
  • Spring如何基于注解配置使用ehcache

    2022-08-16 03:24:32
  • ToStringBuilder类的一些心得

    2022-10-10 04:02:27
  • 解决Android横竖屏切换数据丢失问题的方法

    2021-05-24 04:26:16
  • Java压缩文件工具类ZipUtil使用方法代码示例

    2022-11-26 02:21:32
  • SpringCloud Feign实现微服务之间相互请求问题

    2022-08-29 08:20:53
  • 解析Spring Boot内嵌tomcat关于getServletContext().getRealPath获取得到临时路径的问题

    2023-08-28 07:44:11
  • 如何在mapper文件中使用in("str1","str2")

    2023-07-10 10:40:51
  • Kotlin协程Context应用使用示例详解

    2023-07-10 08:25:16
  • Android仿抖音列表效果

    2022-03-01 16:36:50
  • 基于hibernate框架在eclipse下的配置方法(必看篇)

    2022-11-09 20:30:26
  • asp之家 软件编程 m.aspxhome.com