cogroup和groupWith都是作用在[K,V]结构的item上的函数,它们都是非常有用的函数,能够将不同RDD的相同key的values group到一起。
函数原型:
def cogroupW: RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW: RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW: RDD[(K, (Iterable[V], Iterable[W]))] def cogroupW1, W2: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroupW1, W2: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroupW1, W2: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def groupWithW: RDD[(K, (Iterable[V], Iterable[W]))] def groupWithW1, W2: RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]
例子:
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c))),
(3,(ArrayBuffer(b),ArrayBuffer(c))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
)
val d = a.map((_, "d"))
b.cogroup(c, d).collect
res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))
)
val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)
val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)
x.cogroup(y).collect
res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))),
(2,(ArrayBuffer(banana),ArrayBuffer())),
(3,(ArrayBuffer(orange),ArrayBuffer())),
(1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),
(5,(ArrayBuffer(),ArrayBuffer(computer))))