2015年9月18日 星期五

第9章. Spark RDD介紹與範例指令


Spark的核心是RDDResilient Distributed Dataset)彈性分散式資料集,是由AMPLab實驗室所提出的概念,屬於一種分散式的記憶體。Spark主要優勢是來自RDD本身的特性。RDD能與其他系統相容,可以匯入外部儲存系統的資料集,例如:HDFSHBase或其他Hadoop 資料來源。



RDD的三種基本運算


RDD之上,可以施加三種類型的運算
RDD運算類型
說明
「轉換」運算Transformation
  • RDD執行「轉換」運算的結果,會產生另外一個RDD
  • 但是由於RDDlazy特性,「轉換」運算並不會立刻實際執行,它會等到執行到「動作」運算,才會實際執行。
「動作」運算Action
  • RDD執行「動作」運算後,不會產生另外一個RDD,它會產生數值、陣列或寫入檔案系統。
  • RDD執行「動作」運算時,會立刻實際執行,並且連同之前的「轉換」運算一併執行。
「持久化」Persistence
  • 對於那些會重複使用的RDD,可以將RDD「持久化」在記憶體中做為後續使用,以加快執行效能。


 Lineage機制具備容錯的特性

RDD透過「轉換(Transformation)」運算可以得出新的RDD,但Spark會延遲這個「轉換」動作的發生時間點。它並不會馬上執行,而是等到執行了Action之後,才會基於所有的RDD關係來執行轉換。



如上圖所示:
輸入資料,執行「轉換1運算產生RDD1,此時不會實際執行,只記錄操作命令

  • RDD1 執行「轉換2運算產生RDD2,此時不會實際執行,只記錄操作命令
  • RDD2 執行「轉換3運算產生RDD3,此時不會實際執行,只記錄操作命令
  • RDD2 執行「轉換4運算產生RDD4,此時不會實際執行,只記錄操作命令
  • RDD3 執行「動作1運算,此時會實際執行:「轉換1+「轉換2+「轉換3+「動作1」,產生輸出資料1
  • RDD5執行「動作2運算,此時會實際執行:「轉換1+「轉換2+「轉換4+「動作2」,產生輸出資料2

(如果你之前已經先執行「動作1運算,所以「轉換1+「轉換2」已經實際執行完成,在此只會實際執行「轉換4+「動作2」,節省執行時間)



RDD本身Lineage機制,它會記錄每個RDD與其父代RDD之間的關聯,他會紀錄透過什麼操作,才由父代RDD得到該RDD的資訊。

RDD本身immutable不可變的特性,再加上Lineage機制,使得Spark具備容錯的特性。如果某節點機器故障,儲存於節點上的RDD損毀,能重新執行一連串的「轉換」指令,產生新的輸出資料,如此就可以避免因為特定節點故障,造成整個系統無法運作的問題。

9.2 基本RDD「轉換」運算
Step2 建立intRDD
val intRDD = sc.parallelize(List(3,1, 2, 5, 5))
intRDD.collect()

Step3 建立stringRDD
val stringRDD = sc.parallelize(List("Apple", "Orange", "Banana","Grape","Apple"))
stringRDD.collect()
Step4 map運算
具名函數的寫法:
def addOne(x :Int):Int={
return (x+1)
}
intRDD.map(addOne).collect()
匿名函數的寫法:
intRDD.map(x => x + 1).collect()
匿名函數+匿名參數的寫法:
intRDD.map(_ + 1).collect()
Step7 filter字串運算
stringRDD.filter(x =>x.contains("ra") ).collect()
Step9 randomSplit運算
val sRDD = intRDD.randomSplit(Array(0.4,0.6))
sRDD(0).collect()
sRDD(1).collect()
Step10 groupBy運算
val gRDD=intRDD.groupBy(
x =>{ if (x % 2 == 0) "even" else "odd"}
).collect
gRDD(0)
gRDD(1)
9.3 多個RDD「轉換」運算
Step1 建立3個範例RDD
val intRDD1 = sc.parallelize(List(3, 1, 2, 5, 5))
val intRDD2 = sc.parallelize(List(5, 6))
val intRDD3 = sc.parallelize(List(2, 7))
Step2 union 聯集運算
intRDD1.union(intRDD2).union(intRDD3).collect()
(intRDD1 ++ intRDD2++ intRDD3).collect()
Step3 intersection 交集運算
intRDD1.intersection(intRDD2).collect()
Step4 subtract 差集運算
intRDD1.subtract(intRDD2).collect()
Step5 cartesian笛卡兒乘積運算
intRDD1.cartesian(intRDD2).collect()
9.4 基本「動作」運算
Step1 讀取元素
intRDD.first
intRDD.take(2)
intRDD.takeOrdered(3)
intRDD.takeOrdered(3)(Ordering[Int].reverse)
Step2 統計功能
intRDD.stats
intRDD.min
intRDD.max
intRDD.stdev
intRDD.count
intRDD.sum
intRDD.mean
9.5 RDD Key-Value 基本「轉換」運算
Step1 讀取元素
val kvRDD1 = sc.parallelize(List((3, 4), (3, 6), (5, 6), (1, 2)))
Step2 列出keys值
kvRDD1.keys.collect()
Step3 列出values值
kvRDD1.values.collect()
Step4 使用filter篩選key運算
kvRDD1.filter { case (key, value) => key < 5 }.collect
Step5 使用filter篩選value運算
kvRDD1.filter { case (key, value) => value < 5 }.collect
Step6 mapValues運算
kvRDD1.mapValues(x => x * x).collect
Step7 sortByKey由小至大依照key排序
kvRDD1.sortByKey(true).collect()
kvRDD1.sortByKey().collect()
Step8 sortByKey由大至小依照key排序
kvRDD1.sortByKey(false).collect()
Step9 reduceByKey
kvRDD1.reduceByKey((x,y)=>x+y).collect
kvRDD1.reduceByKey(_+_).collect
9.6 多個RDD Key-Value「轉換」運算
Step1 Key-Value RDD範例
val kvRDD1 = sc.parallelize(List((3, 4), (3, 6), (5, 6), (1, 2)))
val kvRDD2 = sc.parallelize(List((3, 8)))
Step2 Key-Value RDD join運算
kvRDD1.join(kvRDD2).foreach(println)
Step2 Key-Value RDD join運算
kvRDD1.join(kvRDD2).foreach(println)
Step3 Key-Value leftOuterJoin運算
kvRDD1.leftOuterJoin(kvRDD2).foreach(println)
Step4 Key-Value RDD rightOuterJoin運算
kvRDD1.rightOuterJoin(kvRDD2).foreach(println)
Step5 Key-Value subtractByKey運算
kvRDD1.subtractByKey(kvRDD2).collect
9.7 Key-Value 「動作」運算
Step1 Key-Value first運算
kvRDD1.first()
kvRDD1.take(2)
Step2 取得第1筆資料的元素
val kvFirst=kvRDD1.first
kvFirst._1
kvFirst._2
Step3 Key-Value countByKey運算
kvRDD1.countByKey()
Step4 Key-Value collectAsMap運算
var KV=kvRDD1.collectAsMap()
Step5 使用對照表轉換資料
KV(3)
KV(4)
Step6 Key-Value lookup運算
kvRDD1.lookup(3)
kvRDD1.lookup(5)
9.8 Broadcast 廣播變數
Step1 不使用Broadcast 廣播變數的範例
val kvFruit = sc.parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")))
val fruitMap=kvFruit.collectAsMap()
val fruitIds=sc.parallelize(List(2,4,1,3))
val fruitNames= fruitIds.map(x=>fruitMap(x)).collect
Step2 使用Broadcast 廣播變數的範例
val kvFruit = sc.parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")))
val fruitMap=kvFruit.collectAsMap()
val bcFruitMap=sc.broadcast(fruitMap)
val fruitIds=sc.parallelize(List(2,4,1,3))
val fruitNames= fruitIds.map(x=>bcFruitMap.value(x)).collect
9.9 accumulator累加器
Step1 不使用Broadcast 廣播變數的範例
val intRDD = sc.parallelize(List(3,1, 2, 5, 5))
val total = sc.accumulator(0.0)
val num = sc.accumulator(0)
intRDD.foreach(i => {
total += i
num += 1
})
println("total="+total.value+", num="+num.value)
val avg = total.value / num.value
9.10 RDD Persistence持久化
Step2 RDD.persist()範例
val intRddMemory = sc.parallelize(List(3,1, 2, 5, 5))
intRddMemory.persist()
intRddMemory.unpersist()
Step3 RDD.persist設定儲存等級範例
import org.apache.spark.storage.StorageLevel
val intRddMemoryAndDisk = sc.parallelize(List(3,1, 2, 5, 5))
intRddMemoryAndDisk.persist(StorageLevel.MEMORY_AND_DISK)
intRddMemoryAndDisk.unpersist()
9.11 使用Spark 建立WordCount
Step1 建立測試檔案
cd ~/workspace/WordCount/data
gedit test.txt
Apple Apple Orange
Banana Grape Grape
Step2 進入spark-shell
spark-shell
Step3 執行WordCount spark命令
val textFile = sc.textFile("file:/home/hduser/workspace/WordCount/data/test.txt")
val stringRDD=textFile.flatMap(line => line.split(" "))
val countsRDD = stringRDD.map(word => (word, 1)).reduceByKey(_ + _)
countsRDD.saveAsTextFile("file:/home/hduser/workspace/WordCount/data/output")
exit
Step4 查看data目錄
ll
Step5 查看output目錄
cd output
ll
Step6 查看part-00000輸出檔案
cat part-00000
9.12 Spark WordCount 詳細解說
val textFile = sc.textFile("file:/home/hduser/workspace/WordCount/data/test.txt")
val stringRDD=textFile.
    flatMap(line => line.split(" "))

val countsRDD = stringRDD.
                map(word => (word, 1)).
                reduceByKey(_ + _)
countsRDD.saveAsTextFile("file:/home/hduser/workspace/WordCount/data/output")
Step1 sc.textFile讀取檔案
val textFile = sc.textFile("file:/home/hduser/workspace/WordCount/data/test.txt")
textFile.foreach(println)
Step2 flatMap取出每一個字
val stringRDD=textFile.flatMap(line => line.split(" "))
stringRDD.collect
Step3 flatMap與map的差異
textFile.map(line =>line.split(" ")).collect
textFile.flatMap(line => line.split(" ")).collect


 
 想要了解看更多有關Spark RDD資訊,書中有詳細介紹
http://www.books.com.tw/products/0010695285?loc=P_012_0_201


 

此圖出自Spark官網 https://spark.apache.org/

沒有留言:

張貼留言