Spark RDD cogroup, groupWith

- 2 mins

cogroup [Pair], groupWith [Pair]

cogroup和groupWith都是作用在[K,V]结构的item上的函数,它们都是非常有用的函数,能够将不同RDD的相同key的values group到一起。

函数原型:

def cogroupW: RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW: RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW: RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW1, W2: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroupW1, W2: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroupW1, W2: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def groupWithW: RDD[(K, (Iterable[V], Iterable[W]))] def groupWithW1, W2: RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]

例子:

val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))

b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c))),
(3,(ArrayBuffer(b),ArrayBuffer(c))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
)


val d = a.map((_, "d"))

b.cogroup(c, d).collect
res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))
)


val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)
val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)
x.cogroup(y).collect
res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), 
(2,(ArrayBuffer(banana),ArrayBuffer())), 
(3,(ArrayBuffer(orange),ArrayBuffer())),
(1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),
(5,(ArrayBuffer(),ArrayBuffer(computer))))
comments powered by Disqus
rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora