星期五, 6月 17, 2016

Spark Training 上課小記

Spark Training 上課小記
  • Course file in Github


$ cp   /opt/spark-1.5.1-bin-hadoop2.6.tgz   ~

$ tar  zxvf  spark-1.5.1-bin-hadoop2.6.tgz

$ mv  spark-1.5.1-bin-hadoop2.6  spark

測試 shell
Launch interactive shell
$  ~/spark/bin/spark-shell
$  ~/spark/bin/pyspark

Note:
  • pyspark 沒有自動補齊, 須查證為何沒有自動補齊

Functional Programming
  • Pass FUNCTION ( what to do ) as method parameter, rather than OBJECT(what)
    • 將要做的事情丟進去, 而不是 object
  • Scala: FP language based on JVM
  • Java: support FP since Java 8
  • Python: support FP

在 IDE 內以 local 模式執行 spark application
  • 程式碼內 使用 .setMaster(“local”)  指定為 local
  • SparkConf conf = new SparkConf().setAppName("HelloWorld").setMaster("local");

== 中午休息 ==

If there are no enough memory for caching RDD partitions
  • MEMORY_ONLY
    • Oldest will be deleted, then recomputed if nessary
  • MEMORY_AND_DISK
    • Swap to disk and read back when necessary

RDD Type
  • Basic RDD[T]
    • Considers each data item as a single value
    • Convert to other RDD Type
  • PariRDDs
    • Each data item containing key / value pairs.
  • DoubleRDD
    • Data items are confertable to the Scaladata-type double


從集合建立RDD
啟動 spark-shell

scala>  sc.parallelize( 1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:16

scala> sc.parallelize(Array("1","2","3"))
res1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at
<console>:16

scala> res0.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> res1.collect()
res5: Array[String] = Array(1, 2, 3)

Notes:
  • 如果沒有指定 val 那預設就是用 res0 的方式來進行

scala> val a = sc.parallelize( 1 to 20 )
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:15

scala> a.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)


RDD.map( func )
  • map 是針對RDD 內所有的東西做一樣的動作

scala> a.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

scala> a.map( x => x + 1)
res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:18

scala> a.collect()
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

scala> res7.collect()
res9: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)


RDD.filter( func )
  • Filter  指定的條件

scala> a.collect()
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

scala> a.filter( x => x != 1)
res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:18

scala> res11.collect()
res12: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

RDD.flatMap( func )
  • map 是針對RDD 內所有的東西做一樣的動作, 然後攤平成一個集合

建立 一個 RDD 內容為 1 到 10
scala> val rdd = sc.parallelize( 1 to 10 )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:15

使用 .flatMap, 列出裡面1到每個元件的結果.
  • 以內容來說, 就會列出 1 到 1, 1到 2, 1到 3 …. 到 1 到 10的所有結果, 變成一個集合
scala> rdd.flatMap( x => 1 to x ).collect()
res20: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

反向做法
scala> rdd.flatMap( x => x to 3 ).collect()
res21: Array[Int] = Array(1, 2, 3, 2, 3, 3)


RDD.reduce( func )

scala> var rdd1 = sc.makeRDD( 1 to 10, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:15
  • 後面的 3, 為指定 3 個 partition

以這個 reduce 來就就是將內容累加,  所以 1 到 10 累加是 55
scala> rdd1.reduce( (a,b) => a+b )
res22: Int = 55

這邊要注意的是如果今天 RDD 是分配到多個 partition, 是不保證執行的順序, 所以如果是數值的累加, 有符合交換性不會受到影響, 但是如果是字串型態, 就會產生意外的結果

scala> var range = ( 'a' to 'z' ).map(_.toString)
range: scala.collection.immutable.IndexedSeq[String] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z)

scala> val rdd = sc.parallelize( range, 3 )
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:17

scala> rdd.reduce( (a,b) => a+b)
res23: String = abcdefghrstuvwxyzijklmnopq
  • 這邊就是因為 3 個 partition 不一定按照順序的關係

scala> rdd.partitions.size
res24: Int = 3


RDD.fold(zero)(func)
  • Reduce()的一般式
  • Zero value會運用在每個partition中
  • Zero value也會用在合併partition的結果中( 也就是最後合併的時候會再加一次 )
  • 想法上有點像前置字元, 或是預設值的方式

scala> var rdd1 = sc.makeRDD( 1 to 10, 3 )
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at <console>:15

scala> rdd1.fold(0)( (a,b) => a+b )
res25: Int = 55

scala> rdd1.fold(1)( (a,b) => a+b )
res26: Int = 59
  • 這邊之所以是  59
    • 1 累加到 10  為 55
    • 3 個 partitions 為 1 + 1 + 1 ( 上面的第二點 )
    • 合併累加結果也有 1 個 1 ( 上面的第三點 )
    • 59 = 55 + 3 + 1

scala> val range = ( 'a' to 'z').map(_.toString)
range: scala.collection.immutable.IndexedSeq[String] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z)

scala> val rdd = sc.parallelize(range,3)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:17

scala> rdd.fold("1")( (a,b) => a+b )
res27: String = 11rstuvwxyz1abcdefgh1ijklmnopq

沒有留言: