==> mapPartitionsWithIndex
成都创新互联公司是一家专注于网站建设、网站设计与策划设计,东平网站建设哪家好?成都创新互联公司做网站,专注于网站建设十载,网设计领域的专业建站公司;建站业务涵盖:东平等地区。东平做网站价格咨询:028-86922220---> 定义: def mapPartitionsWithIndex[U](f:(Int, Iterator[T]) => Iterator[U], preserversPartitioning: Boolean = false)
---> 作用: 对 RDD 每个分区进行操作,带有分区号
---> 示例:输出分区号和内容
// 创建一个RDD val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9)) // 创建一个函数,作为 f 的值 def func(index:Int, iter:Iterator[Int]):Iterator[String] = { iter.toList.map(x=>"[PartID: " + index + ", value= " + x + "]").iterator } // 调用 rdd1.mapPartitionsWithIndex(func).colect // 结果 res15: Array[String] = Array([PartitionID: 0,value=1], [PartitionID: 0,value=2], [PartitionID: 0,value=3], [PartitionID: 0,value=4], [PartitionID: 1,value=5], [PartitionID: 1,value=6], [PartitionID: 1,value=7], [PartitionID: 1,value=8], [PartitionID: 1,value=9])
==> aggregate
---> 定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U, T) => U, combOp: (U, U) => U): U
---- (zeroValue: U) 初始值
---- seqOp:(U, T) => U 局部操作
---- combOp:(U, U) => U 全局操作
---> 作用:先对局部进行操作,再对全局进行操作
---> 示例:
// 求两个分区大值的和,初始值为0 val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9)) rdd1.aggregate(0)(math.max(_,_), _+_) // 结果为:res16: Int = 13
==> aggregateByKey
---> 定义:
---> 作用:对 key-value 格式 的数据进行 aggregate 操作
---> 示例:
// 准备一个 key-value 格式的 RDD val parRDD = sc.parallelize(List(("cat", 2),("cat", 5),("mouse", 4),("cat", 12),("dog", 12),("mouse", 2)), 2) // 计算每个分区中的动物最多的个数求和 parRDD.aggregateByKey(0)(math.max(_, _), _+_) // 结果为: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) // 计算每种动物的总数量 parRDD.aggregateByKey(0)(_+_, _+_).collect // 方法一 parRDD.reduceByKey(_+_).collect
==> coalesce 与 repartition
---> 作用:将 RDD 中的分区进行重分区
---> 区别: coalesce 默认不会进行 shuffle(false)
repartition 会进行 shuffle(true), 会将数据真正通过网络进行重分区
---> 示例:
// 定义一个 RDD val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8), 2) // 显示分区中的分区号和分区号中的内容 def func(index:Int, iter:Iterator[Int]):Iterator[String] = { iter.toList.map(x=>"[PartID: " + index + ", value= " + x + "]").iterator } // 查看 rdd 中的分区情况 rdd.mapPartitionsWithIndex(func).collect // 结果为: Array[String] = Array( // [PartID: 0, value= 1], [PartID: 0, value= 2], [PartID: 0, value= 3], [PartID: 0, value= 4], // [PartID: 1, value= 5], [PartID: 1, value= 6], [PartID: 1, value= 7], [PartID: 1, value= 8]) // 使用 repartition 将分区数改为3 val rdd2 = rdd1.repartition(3) val rdd3 = rdd1.coalesce(3, true) // 查看rdd2 与rdd3 的分区情况 rdd2.mapPartitionsWithIndex(func).collect rdd3.mapPartitionsWithIndex(func).collect // 结果为:Array[String] = Array( // [PartID: 0, value= 3], [PartID: 0, value= 6], // [PartID: 1, value= 1], [PartID: 1, value= 4], [PartID: 1, value= 7], // [PartID: 2, value= 2], [PartID: 2, value= 5])
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
网站栏目:Scala高级算子-创新互联
标题链接:http://scgulin.cn/article/jjhei.html