Spark中的数据读取保存和累加器实例详解

2022-11-05 08:08:38
目录
数据读取与保存Text文件Sequence文件Object对象文件累加器累加器概念系统累加器

数据读取与保存

Text文件

对于>

1)基本语法

(1)数据读取:textFile(String)

(2)数据保存:saveAsTextFile(String)

2)实现代码demo如下:

object Operate_Text {
    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 读取输入文件
        val inputRDD: RDD[String] = sc.textFile("input/demo.txt")
        //3.2 保存数据
        inputRDD.saveAsTextFile("textFile")
        //4.关闭连接
        sc.stop()
    }
}

Sequence文件

SequenceFile文件>

1)基本语法

    (1)数据读取:sequenceFile[ keyClass, valueClass ] (path)(2)数据保存:saveAsSequenceFile(String)

    2)实现代码demo如下:

    object Operate_Sequence {
        def main(args: Array[String]): Unit = {
            //1.创建SparkConf并设置App名称
            val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
            //2.创建SparkContext,该对象是提交Spark App的入口
            val sc: SparkContext = new SparkContext(conf)
            //3.1 创建rdd
            val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9)))
            //3.2 保存数据为SequenceFile
            dataRDD.saveAsSequenceFile("seqFile")
            //3.3 读取SequenceFile文件
            sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println)
            //4.关闭连接
            sc.stop()
        }
    }
    

    Object对象文件

    对象文件是将对象序列化后保存的文件,采用Hadoop的序列化机制。可以通过>

    1)基本语法

      (1)数据读取:objectFile[ k , v ] (path)(2)数据保存:saveAsObjectFile(String)

      2)实现代码demo如下:

      object Operate_Object {
          def main(args: Array[String]): Unit = {
              //1.创建SparkConf并设置App名称
              val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
              //2.创建SparkContext,该对象是提交Spark App的入口
              val sc: SparkContext = new SparkContext(conf)
              //3.1 创建RDD
              val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2)
              //3.2 保存数据
              dataRDD.saveAsObjectFile("objFile")
              //3.3 读取数据
              sc.objectFile[Int]("objFile").collect().foreach(println)
              //4.关闭连接
              sc.stop()
          }
      }
      

      累加器

      累加器概念

      累加器,是一种变量---分布式共享只写变量。仅支持“add”,支持并发,但Executor和Executor之间不能读数据,可实现所有分片处理时更新共享变量的功能。

      累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

      系统累加器

      1)累加器定义(SparkContext.accumulator(initialValue)方法)

      val>

      2)累加器添加数据(累加器.add方法)

      sum.add(count)

      3)累加器获取数据(累加器.value)

      sum.value

      注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。

      4)累加器要放在行动算子中

      因为转换算子执行的次数取决于job的数量,如果一个 spark应用 有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动算子中。

      5) 代码实现:

      object accumulator_system {
      package com.atguigu.cache
      import org.apache.spark.rdd.RDD
      import org.apache.spark.util.LongAccumulator
      import org.apache.spark.{SparkConf, SparkContext}
      object accumulator_system {
        def main(args: Array[String]): Unit = {
          val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
          val sc = new SparkContext(conf)
          val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
          //需求:统计a出现的所有次数 ("a",10)
          //普通算子实现 reduceByKey 代码会走shuffle 效率低
          val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
          //累加器实现
          //1 声明累加器
          val accSum: LongAccumulator = sc.longAccumulator("sum")
          dataRDD.foreach{
            case (a,count) => {
              //2 使用累加器累加  累加器.add()
              accSum.add(count)
              // 4 不在executor端获取累加器的值,因为得到的值不准确,所以累加器叫分布式共享只写变量
              //println("sum = " + accSum.value)
            }
          }
          //3 获取累加器的值 累加器.value
          println(("a",accSum.value))
          sc.stop()
        }
      }

      以上就是Spark中的数据读取保存和累加器实例详解的详细内容,更多关于Spark数据读取保存累加器的资料请关注易采站长站其它相关文章!