SparkValue类型的常用算子

Spark RDD常用算子:Value类型

Spark之所以比Hadoop灵活和强大,其中一个原因是Spark内置了许多有用的算子,也就是方法。通过对这些方法的组合,编程人员就可以写出自己想要的功能。说白了spark编程就是对spark算子的使用,下面为大家详细讲解一下SparkValue类型的常用算子

map

函数说明:

map() 接收一个函数,该函数将RDD中的元素逐条进行映射转换,可以是类型的转换,也可以是值的转换,将函数的返回结果作为结果RDD编程。

函数签名:

def map[U: ClassTag](f: T => U): RDD[U]

案例演示

   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
   val sc = new SparkContext(sparkConf)
   //算子 -map
   val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
   val mapRdd1 = rdd.map(
     _*2
   )
   mapRdd1.collect().foreach(println)
   sc.stop()

运行结果

2
4
6
8

mapPartitons

函数说明:

将待处理的数据以分区为单位发送到待计算节点上进行处理,mapPartition是对RDD的每一个分区的迭代器进行操作,返回的是迭代器。这里的处理可以进行任意的处理。

函数签名:

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

案例演示

 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
   val sc = new SparkContext(sparkConf)
   //算子 -mapPartitons 计算每个分区的最大数
   val rdd = sc.makeRDD(List(1, 34, 36,345,2435,2342,62,35, 4),4)
   val mapParRdd = rdd.mapPartitions(
     iter => {
       List(iter.max).iterator
     }
   )
   mapParRdd.foreach(println)
   sc.stop()
 }

运行结果:

62
2435
34
345

mapPartitonsWithIndex

函数说明:

将待处理的数据以分区为单位发送到计算节点上,这里的处理可以进行任意的处理,哪怕是过滤数据,在处理的同时可以获取当前分区的索引值。

函数签名:

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

案例演示:

  1. 将数据进行扁平化映射并且打印所在的分区数
def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello Spark""Hello Scala""Word Count"),2)
   val mapRDD = rdd.flatMap(_.split(" "))
   val mpwiRdd = mapRDD.mapPartitionsWithIndex(
     (index, datas) => {
       datas.map(
         num => {
           (index, num)
         }
       )
     }
   )
   mpwiRdd.collect().foreach(println)
 }

运行结果:

(0,Hello)
(0,Spark)
(1,Hello)
(1,Scala)
(1,Word)
(1,Count)
  1. 将数据进行扁平化映射只打印所在第一分区的数据
def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello Spark""Hello Scala""Word Count"),2)
   val mapRDD = rdd.flatMap(_.split(" "))
   val mpwiRdd = mapRDD.mapPartitionsWithIndex(
     (index, datas) => {
       if (index==0){
         datas.map(
           num => {
             (index, num)
           }
         )
       }else{
       Nil.iterator
       }
     }
   )
   mpwiRdd.collect().foreach(println)

运行结果:

(0,Hello)
(0,Spark)

flatMap

函数说明:

将数据进行扁平化之后在做映射处理,所以算子也称为扁平化映射

函数签名:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

案例演示:

将每个单词进行扁平化映射

def main(args: Array[String]): Unit = {
 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
 val sc = new SparkContext(sparkConf)
 //算子 -map
 val rdd = sc.makeRDD(List("Hello Scala","Hello Spark"), 2)
 val FltRdd = rdd.flatMap(
   _.split(" ")
 )
 FltRdd.foreach(println)
 sc.stop()
}

运行结果:

Hello
Scala
Hello
Spark

glom

函数说明:

glom的作用就是将一个分区的数据合并到一个array中。

函数签名:

def glom(): RDD[Array[T]]

案例演示:

  1. 将不同分区rdd的元素合并到一个分区
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9),2)
   val glomRdd = rdd.glom()
   glomRdd.collect().foreach(data=>println(data.mkString(",")))
   sc.stop()
 }

运行结果:

1,2,3,4
5,6,7,8,9

groupBy

函数说明:

将数据根据指定的规则进行分组,分区默认不变,单数数据会被打乱,我们成这样的操作为shuffer,

函数签名:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

案例演示:

  1. 按照奇偶数进行groupby分区
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8,10),2)
   val groupByRDD = rdd.groupBy(_ % 2 == 0)
   groupByRDD.collect().foreach(println)
   sc.stop()
 }

运行结果:

(false,CompactBuffer(1, 3, 5, 7))
(true,CompactBuffer(2, 4, 6, 8, 10))
  1. 按照单词的首字母进行分组
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello","Tom","Timi","Scala","Spark"))
   val groupByRDD = rdd.groupBy(_.charAt(0))
   groupByRDD.collect().foreach(println)
   sc.stop()
 }

运行结果:

(T,CompactBuffer(Tom, Timi))
(H,CompactBuffer(Hello))
(S,CompactBuffer(Scala, Spark))

filter

函数说明:

filter即过滤器的意思,所以filter算子的作用就是过滤的作用。filter将根据指定的规则进行筛选过滤,符合条件的数据保留,不符合的数据丢弃,当数据进行筛选过滤之后,分区不变,但分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

函数签名:

def filter(f: T => Boolean): RDD[T]

案例演示:

  1. 筛选出能被二整除的数字
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(46,235,246,2346,3276,235,234,6234,6245,246,24,6246,235,26,265))
   val filterRDD = rdd.filter(_ % 2 == 0)
   filterRDD.collect().foreach(println)
   sc.stop()
 }

运行结果:

46
246
2346
3276
234
6234
246
24
6246
26

2.筛选单词中包含H的

 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello","Horber","Hbeer","ersfgH","Scala","Hadoop","Zookeeper"))
   val filterRDD = rdd.filter(_.contains("H"))
   filterRDD.collect().foreach(println)
   sc.stop()
 }

运行结果:

Hello
Horber
Hbeer
ersfgH
Hadoop

原创文章,作者:晴川运维,如若转载,请注明出处:https://baike.qcidc.com/14329.html

(0)
晴川运维晴川运维
上一篇 2025年10月5日
下一篇 2025年10月5日

相关推荐

  • Linux sed命令完全攻略(超级详细)

    我们知道,Vim 采用的是交互式文本编辑模式,你可以用键盘命令来交互性地插入、删除或替换数据中的文本。但本节要讲的 sed 命令不同,它采用的是流编辑模式,最明显的特点是,在 se…

    Linux系统 2025年9月24日
  • Linux中通过Timekpr-nExt限制Linux账户的使用

    本周的开源软件亮点是 Timekpr-nExt。它是一个 GUI 应用,用于限制 Linux 系统中某些账户的电脑使用。对于不想让孩子花太多时间在电脑上的父母来说,这是一个方便的工…

    Linux系统 2025年10月20日
  • APF防火墙的安装和使用

    APF(Advanced Policy Firewall)是 Rf-x Networks 出品的Linux环境下的软件防火墙,被大部分Linux服务器管理员所采用,使用iptabl…

    Linux系统 2025年6月10日
  • Linux中历史命令的具体使用方法

    Linux系统当你在shell(控制台)中输入并执行命令时,shell会自动把你的命令记录到历史列表中,一般保存在用户目录下的.bash_history文件中。默认保存1000条,…

    Linux系统 2025年7月6日
  • CentOS7中安装Weblogic具体步骤

    WebLogic是美国Oracle公司出品的一个application server确切的说是一个基于JAVAEE架构的中间件,BEA WebLogic是用于开发、集成、部署和管理…

    Linux系统 2025年6月8日
  • Java随机数的坑你踩过吗

    随机数我们应该不陌生,业务中我们用它来生成验证码,或者对重复性要求不高的id,甚至我们还用它在年会上搞抽奖。今天我们来探讨一下这个东西。如果使用不当会引发一系列问题。 随机数我们应…

    Linux系统 2025年10月5日
  • 详解Linux内核之脏页跟踪

    Linux内核由于存在page cache, 一般修改的文件数据并不会马上同步到磁盘,会缓存在内存的page cache中,我们把这种和磁盘数据不一致的页称为脏页,脏页会在合适的时…

    Linux系统 2025年7月10日
  • Linux Mint 安装 Linux Kernel 4.12

    Linus Torvalds 发布了 Linux 内核 4.12。你可以从这里直接下载相关的 deb 包来安装。或者,继续阅读本文,按下面的步骤安装新内核。 警告:Linux 内核…

    Linux系统 2025年6月17日
  • 详解Mariadb聚合函数及分组查询

    MariaDB Server 是最流行的开源关系型数据库之一。它由 MySQL 的原始开发者制作,并保证保持开源。它是大多数云产品的一部分,也是大多数Linux发行版的默认配置。M…

    Linux系统 2025年6月8日
  • 细说Vim的高级使用方法

    Vim 是从 vi 发展出来的一个文本编辑器。代码补完、编译及错误跳转等方便编程的功能特别丰富,在程序员中被广泛使用。和Emacs并列成为类Unix系统用户最喜欢的编辑器,本篇文章…

    Linux系统 2025年10月6日

发表回复

登录后才能评论