学习笔记-Spark中的groupbykey、reducebykey、combinebykey方法

groupByKey是对具有相同键的值进行分组,reduceByKey是合并具有相同键的值。

groupByKey合并之后会产生一个Iterable,reduceByKey不会。

数据量大的时候一般建议用reduceByKey,因为reduceByKey会在shuffling之前就聚合数据,而groupByKey是在之后。

combineByKey需要定义三个方法,第一组合,第二合并值,第三合并组合器,看起来比较适合复杂的计算。

自己写的实践代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.practice_spark

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

/*
学习一下 groupbykey reducebykey combinebykey 这三个方法
参考文档: https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/reduceByKey--vs-groupBykey-vs-aggregateByKey-vs-combineByKey
*/

object SparkHomework {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("SparkPractice")
.getOrCreate()

// load 商品信息表
val df = spark.read.parquet(
"your_parquet_file")

val rows: RDD[Row] = df.rdd
val pairs = rows.map(x => (x(4).toString, (x(3).toString.toDouble, 1)))

// 统计一下不同类型的商品的平均价格
groupbykeySample(pairs).saveAsTextFile("your_path_1")
reducebykeySample(pairs).saveAsTextFile("your_path_2")
combinebykeySample(rows).saveAsTextFile("your_path_3")
}

def groupbykeySample(
rdd: RDD[(String, (Double, Int))]): RDD[(String, Double)] = {
rdd
.groupByKey()
.map(t => {
val item = t._2
var totalPrice = 0.0
var totalNum = 0
for (a <- item) {
totalPrice += a._1
totalNum += a._2
}
(t._1, totalPrice / totalNum)
})
}

def reducebykeySample(
rdd: RDD[(String, (Double, Int))]): RDD[(String, Double)] = {
rdd
.reduceByKey {
case ((sumL, countL), (sumR, countR)) =>
(sumL + sumR, countL + countR)
}
.mapValues {
case (sum, count) => sum / count
}
}

def combinebykeySample(rdd: RDD[Row]): RDD[(String, Double)] = {
val pairs = rdd.map(x => (x(4).toString, x(3).toString.toDouble))
val createCombiner = (price: Double) => (1, price)

type priceCollector = (Int, Double)
val mergeValue = (collector: priceCollector, price: Double) => {
val (numberPirce, totalPrice) = collector
(numberPirce + 1, totalPrice + price)
}

val mergeCombiners =
(collector1: priceCollector, collector2: priceCollector) => {
val (numPrice1, totalPrice1) = collector1
val (numPrice2, totalPrice2) = collector2

(numPrice1 + numPrice2, totalPrice1 + totalPrice2)
}

type goodsPrice = (String, (Int, Double))

val scores = pairs
.combineByKey(createCombiner, mergeValue, mergeCombiners)
.map { case (key, value) => (key, value._2 / value._1) }
scores
}
}