Spark中的数据读取保存和累加器实例详解

作者:欣xy 时间:2022-09-13 19:26:46 

数据读取与保存

Text文件

对于 Text文件的读取和保存 ,其语法和实现是最简单的,因此我只是简单叙述一下这部分相关知识点,大家可以结合demo具体分析记忆。

1)基本语法

(1)数据读取:textFile(String)

(2)数据保存:saveAsTextFile(String)

2)实现代码demo如下:

object Operate_Text {
   def main(args: Array[String]): Unit = {
       //1.创建SparkConf并设置App名称
       val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
       //2.创建SparkContext,该对象是提交Spark App的入口
       val sc: SparkContext = new SparkContext(conf)
       //3.1 读取输入文件
       val inputRDD: RDD[String] = sc.textFile("input/demo.txt")
       //3.2 保存数据
       inputRDD.saveAsTextFile("textFile")
       //4.关闭连接
       sc.stop()
   }
}

Sequence文件

SequenceFile文件 是Hadoop中用来存储二进制形式的 key-value对 的一种平面文件(Flat File)。在SparkContext中,可以通过调用 sequenceFile[ keyClass,valueClass ] (path) 来调用。

1)基本语法

  • (1)数据读取:sequenceFile[ keyClass, valueClass ] (path)

  • (2)数据保存:saveAsSequenceFile(String)

2)实现代码demo如下:

object Operate_Sequence {
   def main(args: Array[String]): Unit = {
       //1.创建SparkConf并设置App名称
       val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
       //2.创建SparkContext,该对象是提交Spark App的入口
       val sc: SparkContext = new SparkContext(conf)
       //3.1 创建rdd
       val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9)))
       //3.2 保存数据为SequenceFile
       dataRDD.saveAsSequenceFile("seqFile")
       //3.3 读取SequenceFile文件
       sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println)
       //4.关闭连接
       sc.stop()
   }
}

Object对象文件

对象文件是将对象序列化后保存的文件,采用Hadoop的序列化机制。可以通过 objectFile[ k , v ] (path) 函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用 saveAsObjectFile() 实现对对象文件的输出。因为要序列化所以要指定类型。

1)基本语法

  • (1)数据读取:objectFile[ k , v ] (path)

  • (2)数据保存:saveAsObjectFile(String)

2)实现代码demo如下:

object Operate_Object {
   def main(args: Array[String]): Unit = {
       //1.创建SparkConf并设置App名称
       val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
       //2.创建SparkContext,该对象是提交Spark App的入口
       val sc: SparkContext = new SparkContext(conf)
       //3.1 创建RDD
       val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2)
       //3.2 保存数据
       dataRDD.saveAsObjectFile("objFile")
       //3.3 读取数据
       sc.objectFile[Int]("objFile").collect().foreach(println)
       //4.关闭连接
       sc.stop()
   }
}

累加器

累加器概念

累加器,是一种变量---分布式共享只写变量。仅支持“add”,支持并发,但Executor和Executor之间不能读数据,可实现所有分片处理时更新共享变量的功能。

累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

系统累加器

1)累加器定义(SparkContext.accumulator(initialValue)方法)

val sum: LongAccumulator = sc.longAccumulator("sum")

2)累加器添加数据(累加器.add方法)

sum.add(count)

3)累加器获取数据(累加器.value)

sum.value

注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。

4)累加器要放在行动算子中

因为转换算子执行的次数取决于job的数量,如果一个 spark应用 有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动算子中。

5) 代码实现:

object accumulator_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object accumulator_system {
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
   val sc = new SparkContext(conf)
   val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
   //需求:统计a出现的所有次数 ("a",10)
   //普通算子实现 reduceByKey 代码会走shuffle 效率低
   val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
   //累加器实现
   //1 声明累加器
   val accSum: LongAccumulator = sc.longAccumulator("sum")
   dataRDD.foreach{
     case (a,count) => {
       //2 使用累加器累加  累加器.add()
       accSum.add(count)
       // 4 不在executor端获取累加器的值,因为得到的值不准确,所以累加器叫分布式共享只写变量
       //println("sum = " + accSum.value)
     }
   }
   //3 获取累加器的值 累加器.value
   println(("a",accSum.value))
   sc.stop()
 }
}

来源:https://juejin.cn/post/7159579713738899486

标签:Spark,数据读取,数据保存,累加器
0
投稿

猜你喜欢

  • Django与JS交互的示例代码

    2023-05-03 13:38:58
  • python tkinter GUI绘制,以及点击更新显示图片代码

    2021-07-27 14:11:33
  • Django发送邮件和itsdangerous模块的配合使用解析

    2023-09-11 04:29:00
  • Python教程之成员和身份运算符的用法详解

    2021-04-19 11:36:56
  • JavaScript实现简单贪吃蛇效果

    2023-08-13 05:48:08
  • Python 实现递归法解决迷宫问题的示例代码

    2021-01-31 08:14:23
  • Python初学者必须掌握的25个内置函数详解

    2022-07-02 16:09:21
  • python Dataframe 合并与去重详情

    2022-08-17 02:18:54
  • python 代码实现k-means聚类分析的思路(不使用现成聚类库)

    2021-05-28 06:30:24
  • Mysql临时表及分区表区别详解

    2024-01-23 16:28:00
  • Python-numpy实现灰度图像的分块和合并方式

    2021-06-14 16:24:27
  • 数据结构-树(三):多路搜索树B树、B+树

    2024-01-27 01:21:43
  • Python如何优雅获取本机IP方法

    2021-03-07 15:46:16
  • python利用标准库如何获取本地IP示例详解

    2021-10-17 07:46:07
  • Python数据分析之缺失值检测与处理详解

    2021-10-04 01:09:08
  • Python Numpy之linspace用法说明

    2021-10-17 10:49:55
  • Python Pandas知识点之缺失值处理详解

    2023-09-29 20:23:16
  • Chrome V8 引擎对 sort 的优化

    2010-02-04 17:27:00
  • VSCode配置python环境及中文问题解决方法

    2022-07-14 15:39:02
  • MSSQL安全设置的具体步骤和方法小结

    2024-01-18 05:55:43
  • asp之家 网络编程 m.aspxhome.com