MapReduce经典问题和Spark实现
MapReduce计算范式是当今的分布式计算算法实现的基础,也是算法工程师需要掌握的知识,同时“面试造火箭“的常见问题。本文梳理了MapReduce常见的算法场景,并通过Spark进行实现
package org.shiningsand
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* @Author: YangLei
* @Description:
* @Date: Create in 11:18 AM 2019/4/24
* @update in 11:18 AM 2019/4/24
*/
object ClassicMRProblem extends Logging{
val ss = SparkSession.builder()
.appName("mr-implementation")
.master("local[*]")
.config(sparkConfig)
.getOrCreate()
ss.sparkContext.setLogLevel("error")
val sc = ss.sparkContext
def main(args: Array[String]): Unit = {
topN()
}
def MaxMin: Unit = {
//初始化测试数据(可以指定分区数)
val data = sc.parallelize(Array(10,17,3,4,5,6,7,8,1001,6,2,100,2000,30,40),2);
log.error("NumPartitions:: "+data.getClass.toString+" "+data.getNumPartitions.toString)
val data2 = sc.parallelize(Array(10,17,3,4,5,6,7,8,1001,6,2,100,2000,30,40));
log.error("NumPartitions:: "+data2.getClass.toString+" "+data2.getNumPartitions.toString)
/**
* 使用Reduce
* def reduce(f: (T, T) ⇒ T): T
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
* reduce方法是RDD结构的高级算子,数据处理中使用频率很高
* 处理逻辑:
* each partition is processed sequentially element by element
* multiple partitions can be processed at the same time either by a single worker
* (multiple executor threads) or different workers
* partial results are fetched to the driver where the final reduction is applied
*/
val max = data2.reduce((a,b) => Math.max(a,b))
val min = data2.reduce((a,b) => Math.min(a,b))
println("max : " + max)
println("min : " + min)
/**
* 使用ReduceByKey
* ReduceByKey属于PairRDD操作,对数据分区的把控能力更强
*
* def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
* combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
* }
*
* /**
* * Merge the values for each key using an associative and commutative reduce function. This will
* * also perform the merging locally on each mapper before sending results to a reducer, similarly
* * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
**/
* def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
* reduceByKey(new HashPartitioner(numPartitions), func)
* }
*/
/**
* Spark分区默认hashPartitioner,还可以使用rangePartitioner和自定义partitioner
* https://blog.csdn.net/high2011/article/details/68491115
*/
def putKey(iter: Iterator[Int]): Iterator[(String,Int)]= {
var res = List[(String,Int)]()
while (iter.hasNext) {
val cur = iter.next;
val new_value = ((cur%5+'a').toChar.toString,cur)
res = new_value :: res
}
res.iterator
}
val data3 = data2.mapPartitions(putKey)
data3.groupByKey().foreachPartition { p =>
println("分区"+p.toIterator.mkString)
}
val maxByKey = data3.reduceByKey((a,b) => Math.max(a,b)).map(r=>("last",r._2))
.reduceByKey((a,b) => Math.max(a,b))
val minByKey = data3.reduceByKey((a,b) => Math.min(a,b)).map(r=>("last",r._2))
.reduceByKey((a,b) => Math.min(a,b))
println()
println("maxByKey : " + maxByKey.map(_._2).collect().mkString)
println("minByKey : " + minByKey.map(_._2).collect().mkString)
/**
* /**
* * Aggregate the elements of each partition, and then the results for all the partitions, using
* * given combine functions and a neutral "zero value". This function can return a different result
* * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* * allowed to modify and return their first argument instead of creating a new U to avoid memory
* * allocation.
* *
* * @param zeroValue the initial value for the accumulated result of each partition for the
* * `seqOp` operator, and also the initial value for the combine results from
* * different partitions for the `combOp` operator - this will typically be the
* * neutral element (e.g. `Nil` for list concatenation or `0` for summation)
* * @param seqOp an operator used to accumulate results within a partition
* * @param combOp an associative operator used to combine results from different partitions
**/
* def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
*
* aggregate方法
*
*/
val agrMax = data3.aggregate(Integer.MIN_VALUE)((maxValue,num) =>Math.max(maxValue,num._2),
(maxV1,maxV2)=> Math.max(maxV1,maxV2))
println("agrMax: "+agrMax)
//aggregationByKey
val agrMaxByKey = data3.aggregateByKey(Integer.MIN_VALUE,4)((maxValue,num) =>{Math.max(maxValue,num)},
(maxV1,maxV2)=> Math.max(maxV1,maxV2)
)
agrMaxByKey.foreach(print(_))
println()
val agrMaxKey = agrMaxByKey.map(r=>("last",r._2))
.reduceByKey((a,b) => Math.max(a,b)).map(_._2).collect().mkString
println("maxByKey : " + agrMaxKey)
sc.stop
}
def deduplication(): Unit ={
/**
* ReduceByKey去重
*/
val duplicateData = sc.parallelize(Array(("2012-3-1","a"),("2012-3-1","a"),("2012-3-2","c"),("2012-3-2","b"),("2012-3-3","c")))
println(duplicateData.map(p =>(p,1)).reduceByKey(_+_).keys.collect().mkString)
duplicateData.sortByKey()
}
def topN(): Unit ={
val data = sc.parallelize(Array(
("uuid_1","app_1",78),
("uuid_1","app_2",90),
("uuid_1","app_3",200),
("uuid_2","app_1",3),
("uuid_2","app_2",7),
("uuid_3","app_1",178),
("uuid_3","app_2",28),
("uuid_3","app_3",43),
("uuid_3","app_4",2),
("uuid_4","app_1",0),
("uuid_4","app_2",6),
("uuid_4","app_3",743),
("uuid_4","app_4",34)
)
)
val data1 = data.map(p=>((Random.nextInt(3),p._1),(p._2,p._3))).groupByKey()
data1.foreach(p =>println(p))
println("==============")
// val data2 = data1.flatMap({p=>(p._1._2,p._2.toList.sortWith(_._2>_._2).take(1))})
// data2.foreach(p =>println(p))
// print(data2.collect().mkString)
val topNResult3 = data.map(p=>(p._1,(p._2,p._3))).aggregateByKey(ArrayBuffer[(String,Int)]())(
(array, num) => {
array += num
array.sortWith(_._2>_._2).takeRight(2)
},
(u1, u2) => {
//对任意的两个局部聚合值进行聚合操作,可以会发生在combiner阶段和shuffle之后的最终的数据聚合的阶段
u1 ++= u2
u1.sorted.takeRight(2)
}
).map(p => (p._1, p._2.toList))
println("+---+---+ 使用aggregateByKey获取TopN的结果:")
topNResult3.foreach(println(_))
}
def sparkConfig: SparkConf = {
val conf = new SparkConf()
conf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
conf.set("spark.streaming.kafka.maxRatePerPartition", "8000")
conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
conf.set("spark.streaming.backpressure.enabled", "true")
conf.set("hive.exec.dynamic.partition", "true")
conf.set("spark.scheduler.mode", "FIFO")
conf.set("spark.streaming.concurrentJobs", "100")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.debug.maxToStringField", "1000")
conf.set("spark.rdd.compress", "true")
conf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
conf.set("spark.storage.memoryFraction", "0.3")
}
}