MapReduce经典问题和Spark实现

MapReduce计算范式是当今的分布式计算算法实现的基础,也是算法工程师需要掌握的知识,同时“面试造火箭“的常见问题。本文梳理了MapReduce常见的算法场景,并通过Spark进行实现

package org.shiningsand

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer
import scala.util.Random


/**
  * @Author: YangLei
  * @Description:
  * @Date: Create in 11:18 AM 2019/4/24
  * @update in 11:18 AM 2019/4/24
  */
object ClassicMRProblem extends Logging{
  val ss = SparkSession.builder()
    .appName("mr-implementation")
    .master("local[*]")
    .config(sparkConfig)
    .getOrCreate()
  ss.sparkContext.setLogLevel("error")
  val sc = ss.sparkContext

  def main(args: Array[String]): Unit = {
    topN()
  }
  def MaxMin: Unit = {


    //初始化测试数据(可以指定分区数)
    val data = sc.parallelize(Array(10,17,3,4,5,6,7,8,1001,6,2,100,2000,30,40),2);
    log.error("NumPartitions:: "+data.getClass.toString+" "+data.getNumPartitions.toString)
    val data2 = sc.parallelize(Array(10,17,3,4,5,6,7,8,1001,6,2,100,2000,30,40));
    log.error("NumPartitions:: "+data2.getClass.toString+" "+data2.getNumPartitions.toString)

    /**
      * 使用Reduce
      * def reduce(f: (T, T) ⇒ T): T
      * Reduces the elements of this RDD using the specified commutative and associative binary operator.
      * reduce方法是RDD结构的高级算子,数据处理中使用频率很高
      * 处理逻辑:
      * each partition is processed sequentially element by element
      * multiple partitions can be processed at the same time either by a single worker
      * (multiple executor threads) or different workers
      * partial results are fetched to the driver where the final reduction is applied
      */

    val max = data2.reduce((a,b) => Math.max(a,b))
    val min = data2.reduce((a,b) => Math.min(a,b))
    println("max : " + max)
    println("min : " + min)


    /**
      * 使用ReduceByKey
      * ReduceByKey属于PairRDD操作,对数据分区的把控能力更强
      *
      * def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
      * combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
      * }
      *
      * /**
      * * Merge the values for each key using an associative and commutative reduce function. This will
      * * also perform the merging locally on each mapper before sending results to a reducer, similarly
      * * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
      **/
      * def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
      * reduceByKey(new HashPartitioner(numPartitions), func)
      * }
      */

    /**
      * Spark分区默认hashPartitioner,还可以使用rangePartitioner和自定义partitioner
      * https://blog.csdn.net/high2011/article/details/68491115
      */
    def putKey(iter: Iterator[Int]): Iterator[(String,Int)]= {
      var res = List[(String,Int)]()
      while (iter.hasNext) {
        val cur = iter.next;
        val new_value = ((cur%5+'a').toChar.toString,cur)
        res = new_value :: res
      }
      res.iterator
    }
    val data3 = data2.mapPartitions(putKey)

    data3.groupByKey().foreachPartition { p =>
      println("分区"+p.toIterator.mkString)
    }


    val maxByKey = data3.reduceByKey((a,b) => Math.max(a,b)).map(r=>("last",r._2))
      .reduceByKey((a,b) => Math.max(a,b))
    val minByKey = data3.reduceByKey((a,b) => Math.min(a,b)).map(r=>("last",r._2))
        .reduceByKey((a,b) => Math.min(a,b))

    println()
    println("maxByKey : " + maxByKey.map(_._2).collect().mkString)
    println("minByKey : " + minByKey.map(_._2).collect().mkString)

    /**
      * /**
      * * Aggregate the elements of each partition, and then the results for all the partitions, using
      * * given combine functions and a neutral "zero value". This function can return a different result
      * * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
      * * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
      * * allowed to modify and return their first argument instead of creating a new U to avoid memory
      * * allocation.
      * *
      * * @param zeroValue the initial value for the accumulated result of each partition for the
      * *                  `seqOp` operator, and also the initial value for the combine results from
      * *                  different partitions for the `combOp` operator - this will typically be the
      * *                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
      * * @param seqOp an operator used to accumulate results within a partition
      * * @param combOp an associative operator used to combine results from different partitions
      **/
      * def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
      *
      * aggregate方法
      *
      */
    val agrMax = data3.aggregate(Integer.MIN_VALUE)((maxValue,num) =>Math.max(maxValue,num._2),
      (maxV1,maxV2)=> Math.max(maxV1,maxV2))
    println("agrMax: "+agrMax)

    //aggregationByKey
    val agrMaxByKey = data3.aggregateByKey(Integer.MIN_VALUE,4)((maxValue,num) =>{Math.max(maxValue,num)},
      (maxV1,maxV2)=> Math.max(maxV1,maxV2)
    )

    agrMaxByKey.foreach(print(_))
    println()

    val agrMaxKey = agrMaxByKey.map(r=>("last",r._2))
      .reduceByKey((a,b) => Math.max(a,b)).map(_._2).collect().mkString

    println("maxByKey : " + agrMaxKey)

    sc.stop
  }



  def deduplication(): Unit ={

    /**
      * ReduceByKey去重
      */
    val duplicateData = sc.parallelize(Array(("2012-3-1","a"),("2012-3-1","a"),("2012-3-2","c"),("2012-3-2","b"),("2012-3-3","c")))

    println(duplicateData.map(p =>(p,1)).reduceByKey(_+_).keys.collect().mkString)

    duplicateData.sortByKey()
  }


  def topN(): Unit ={
    val data = sc.parallelize(Array(
      ("uuid_1","app_1",78),
      ("uuid_1","app_2",90),
      ("uuid_1","app_3",200),
      ("uuid_2","app_1",3),
      ("uuid_2","app_2",7),
      ("uuid_3","app_1",178),
      ("uuid_3","app_2",28),
      ("uuid_3","app_3",43),
      ("uuid_3","app_4",2),
      ("uuid_4","app_1",0),
      ("uuid_4","app_2",6),
      ("uuid_4","app_3",743),
      ("uuid_4","app_4",34)
    )
    )
    val data1 = data.map(p=>((Random.nextInt(3),p._1),(p._2,p._3))).groupByKey()
    data1.foreach(p =>println(p))
    println("==============")
//    val data2 = data1.flatMap({p=>(p._1._2,p._2.toList.sortWith(_._2>_._2).take(1))})
//    data2.foreach(p =>println(p))
//    print(data2.collect().mkString)


    val topNResult3 = data.map(p=>(p._1,(p._2,p._3))).aggregateByKey(ArrayBuffer[(String,Int)]())(
      (array, num) => {
        array += num
        array.sortWith(_._2>_._2).takeRight(2)
      },
      (u1, u2) => {
        //对任意的两个局部聚合值进行聚合操作,可以会发生在combiner阶段和shuffle之后的最终的数据聚合的阶段
        u1 ++= u2
        u1.sorted.takeRight(2)
      }
    ).map(p => (p._1, p._2.toList))

    println("+---+---+ 使用aggregateByKey获取TopN的结果:")
    topNResult3.foreach(println(_))

  }

  def sparkConfig: SparkConf = {
    val conf = new SparkConf()
    conf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "8000")
    conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    conf.set("spark.streaming.backpressure.enabled", "true")
    conf.set("hive.exec.dynamic.partition", "true")
    conf.set("spark.scheduler.mode", "FIFO")
    conf.set("spark.streaming.concurrentJobs", "100")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.debug.maxToStringField", "1000")
    conf.set("spark.rdd.compress", "true")
    conf.set("spark.streaming.kafka.consumer.cache.enabled", "false")
    conf.set("spark.storage.memoryFraction", "0.3")
  }
}