博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
saprk里面的action - aggregate
阅读量:6646 次
发布时间:2019-06-25

本文共 2558 字,大约阅读时间需要 8 分钟。

上一篇讲到了spark里面的action函数:

 

Action列表:

  • reduce
  • collect
  • count
  • first
  • take
  • takeSample
  • takeOrdered
  • saveAsTextFile
  • saveAsSequenceFile
  • saveAsObjectFile
  • countByKey
  • foreach

action貌似还有:

  按默认或者指定的排序规则返回前n个元素,默认按降序输出
   lookup(k):作用于K-V类型的RDD上,返回指定K的所有V值
 
里面有几个比较难搞清楚的函数,比如 aggregate , 还有 aggregateByKey
参考 http://blog.csdn.net/zhihaoma/article/details/52609503
 

aggregate(zeroValue,seq,comb,taskNums)

将初始值和第一个分区中的第一个元素传递给seq函数进行计算,然后将计算结果和第二个元素传递给seq函数,直到计算到最后一个值。第二个分区中也是同理操作。最后将初始值、所有分区的结果经过combine函数进行计算(先将前两个结果进行计算,将返回结果和下一个结果传给combine函数,以此类推),并返回最终结果。

 

>>> data = sc.parallelize((1,2,3,4,5,6),2)>>> def seq(a,b):...     print 'seqOp:'+str(a)+"\t"+str(b)...     return min(a,b)... >>> def combine(a,b):...     print 'comOp:'+str(a)+"\t"+str(b)...     return a+b... >>> data.aggregate(3,seq,combine)seqOp:3  1seqOp:1  2seqOp:1  3seqOp:3  4seqOp:3  5seqOp:3  6comOp:3  1comOp:4  37>>>

注意里面有一个初始值,而初始值既用在seq里面,也用在combine里面。

 

 

aggregateByKey(zeroValue,seq,comb,taskNums)

在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

注意:aggregateByKey中的初始值只需要和reduce函数计算,不需要和combine函数结合计算,所以导致结果有点不一样。

 

注意这里面 combine 会把不同分区的合起来。而且函数里面也带了taskNums,非常绕。
先看例子:
val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))    val rdd = sc.parallelize(data)    val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)(      // seqOp      math.max(_,_),      // combOp      _+_    )

得到:

根据Key值的不同,可以分为3个组:(1)  (1,3),(1,2),(1,4);(2)  (2,3);(3)  (3,6),(3,8)。这3个组分别进行seqOp,也就是(K,V)里面的V和0进行math.max()运算,运算结果和下一个V继续运算,以第一个组为例,运算过程是这样的:0, 3 => 33, 2 => 33, 4 => 4所以最终结果是(1,4)。combOp是对把各分区的V加起来,由于这里并没有分区,所以实际上是不起作用的。

运行结果:

(2,3)(1,4)(3,8)

那么如果增加了分区,结果:

如果生成RDD时分成3个区:val rdd = sc.parallelize(data,3)运行结果就变成了:(3,8)(1,7)(2,3)这是因为一个分区返回(1,3),另一个分区返回(1,4),combOp将这两个V加起来,就得到了(1,7)。

 

再看例子:
>>> data = sc.parallelize([(1,3),(1,2),(1,4),(2,3)])>>> def seq(a,b):...     return max(a,b)... >>> def combine(a,b):...     return a+b... >>> data.aggregateByKey(3,seq,comb,4).collect()[(1, 10), (2, 3)]

注意上面,如果最后一个参数是1或者2,那么结果是7.

 

 

fold函数

  • reduce()与fold()方法是对同种元素类型数据的RDD进行操作,即必须同构。其返回值返回一个同样类型的新元素。

fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。(例如,加法初始值应为0,乘法初始值应为1)

num.fold(0,lambda x,y:x+y)

 

aggregate()方法可以对两个不同类型的元素进行聚合,即支持异构。

 

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

  1. scala> rdd1.fold(1)(
  2. | (x,y) => x + y
  3. | )
  4. res19: Int = 58
  5.  
  6. ##结果同上面使用aggregate的第一个例子一样,即:
  7. scala> rdd1.aggregate(1)(
  8. | {(x,y) => x + y},
  9. | {(a,b) => a + b}
  10. | )
  11. res20: Int = 58
  12.  

 

另外还有foldLeft foldRight

细节先不看了。

 

你可能感兴趣的文章
鸿蒙os相机,鸿蒙os有什么功能-有什么特殊之处
查看>>
微信html5图片裁切,微信小程序图片裁剪工具we-cropper
查看>>
小学生学计算机flash,利用Flash软件进行小学电脑绘画教学
查看>>
html form 与table,form和table的区别
查看>>
【原创】MySQL 模拟Oracle邻接模型树形处理
查看>>
SSD上如何进行数据保护?
查看>>
Verizon:2012年数据破坏调查报告
查看>>
今日你以老师为荣,明日老师以你为荣!
查看>>
华为交换机VRP用户界面配置及Telnet登录实验
查看>>
Cobbler无人值守安装系统史上最细实践文档
查看>>
第11章代码《跟老男孩学习Linux运维:Shell高级编程实战》
查看>>
一个资深系统管理员的O2O实践(二)
查看>>
ovs-vsctl emer-reset一个需要慎用的命令
查看>>
LVM配置与管理
查看>>
PIX8.0与两个PIX8.0建立L2L ***
查看>>
PowerHA Daily management
查看>>
SAP R3 define bank info
查看>>
我的第100篇博客
查看>>
一图胜千言 -- SQL Server 日常运维
查看>>
screen:一款开源的会话保持软件
查看>>