2016年2月12日 星期五

Spark RDD 介紹與範例指令


Apache Spark是In-Memory大數據資料運算框架,Spark的核心是RDD,Spark主要優勢是來自RDD本身的特性,要運用Spark一定要先了解RDD。
 
Spark的核心是RDDResilient Distributed Dataset)彈性分散式資料集,是由AMPLab實驗室所提出的概念,屬於一種分散式的記憶體。RDD能與其他系統相容,可以匯入外部儲存系統的資料集,例如:HDFSHBase或其他Hadoop 資料來源。



RDD的三種基本運算


RDD之上,可以施加三種類型的運算



RDD運算類型
說明
「轉換」運算Transformation
  • RDD執行「轉換」運算的結果,會產生另外一個RDD
  • 但是由於RDDlazy特性,「轉換」運算並不會立刻實際執行,它會等到執行到「動作」運算,才會實際執行。
「動作」運算Action
  • RDD執行「動作」運算後,不會產生另外一個RDD,它會產生數值、陣列或寫入檔案系統。
  • RDD執行「動作」運算時,會立刻實際執行,並且連同之前的「轉換」運算一併執行。
「持久化」Persistence
  • 對於那些會重複使用的RDD,可以將RDD「持久化」在記憶體中做為後續使用,以加快執行效能。


 Lineage機制具備容錯的特性

RDD透過「轉換(Transformation)」運算可以得出新的RDD,但Spark會延遲這個「轉換」動作的發生時間點。它並不會馬上執行,而是等到執行了Action之後,才會基於所有的RDD關係來執行轉換。



如上圖所示:
輸入資料,執行「轉換1運算產生RDD1,此時不會實際執行,只記錄操作命令

  • RDD1 執行「轉換2運算產生RDD2,此時不會實際執行,只記錄操作命令
  • RDD2 執行「轉換3運算產生RDD3,此時不會實際執行,只記錄操作命令
  • RDD2 執行「轉換4運算產生RDD4,此時不會實際執行,只記錄操作命令
  • RDD3 執行「動作1運算,此時會實際執行:「轉換1+「轉換2+「轉換3+「動作1」,產生輸出資料1
  • RDD5執行「動作2運算,此時會實際執行:「轉換1+「轉換2+「轉換4+「動作2」,產生輸出資料2

(如果你之前已經先執行「動作1運算,所以「轉換1+「轉換2」已經實際執行完成,在此只會實際執行「轉換4+「動作2」,節省執行時間)



RDD本身Lineage機制,它會記錄每個RDD與其父代RDD之間的關聯,他會紀錄透過什麼操作,才由父代RDD得到該RDD的資訊。

RDD本身immutable不可變的特性,再加上Lineage機制,使得Spark具備容錯的特性。如果某節點機器故障,儲存於節點上的RDD損毀,能重新執行一連串的「轉換」指令,產生新的輸出資料,如此就可以避免因為特定節點故障,造成整個系統無法運作的問題。

建立RDD

建立intRDD

建立RDD最簡單的方式,就是使用SparkContext 的parallelize 方法,如下列指令: 
val intRDD = sc.parallelize(List(3,1, 2, 5, 5))
intRDD.collect()
先使用val定義intRDD,然後使用parallelize方法輸入一個List的參數,以建立intRDD。不過這也是一個「轉換」運算,所以不會馬上實際執行。 必須等執行collect()後,會轉換為Array。這是一個「動作」運算所以會立刻執行
建立stringRDD
parallelize方法除了可以建立Int的RDD,也可建立String的RDD,如下列指令
val stringRDD = sc.parallelize(List("Apple", "Orange", "Banana","Grape","Apple"))
stringRDD.collect()
Map運算

map運算可以透過傳入的函數,將每一個元素經過函數運算產生另外一個RDD。如下圖所示,RDD透過傳入的函數addOne,將每一個元素加1而產生另外一個RDD。


在spark的map運算中,可以使用下列3種語法:
具名函數的寫法:
1.先定義addOne函數並傳入參數x,此函數會將x加1再回傳。
2.然後將函數名稱addOne做為參數傳入map命令,map命令會將每一個元素加1,來產生另外一個RDD。
3.不過因為map是一個「轉換」運算,所以不會馬上執行。為了方便示範,所以我們加上collect()這個「動作」運算立即執行。
4.具名函數的語法固然可以達成我們的目的,可是這種語法很麻煩。
def addOne(x :Int):Int={
return (x+1)
}
intRDD.map(addOne).collect()
匿名函數的寫法:
map運算可以使用更簡單的語法。如下列指令,map會傳入(x=>x+1)做為參數,這是lambda 語法的anonymous functions匿名函數。其中x是傳入參數,x+1是要執行的命令,告訴map運算每一個元素都要加1。匿名函數的語法命令簡潔多了,而且讓程式碼更易讀。
intRDD.map(x => x + 1).collect()
匿名函數+匿名參數的寫法:
還有更簡單的語法,使用匿名參數時可以用底線 _ 來取代參數x=>x 。這種語法固 然更簡潔,但是可能很多讀者會不習慣這種語法,可能會影響程式碼的理解,可以 視個人喜好使用。
intRDD.map(_ + 1).collect() 
執行後如下畫面,這3種語法執行結果完全相同,都是將原本(3,1, 2, 5, 5) 每一個數字都加1變成(4, 2, 3, 6, 6)。



filter字串運算
filter可以讓你對RDD內每一個元素進行篩選,並且產生另外的RDD。 例如篩選內含ra的字串
stringRDD.filter(x =>x.contains("ra") ).collect()
多個RDD「轉換」運算 
建立3個範例RDD
val intRDD1 = sc.parallelize(List(3, 1, 2, 5, 5))
val intRDD2 = sc.parallelize(List(5, 6))
val intRDD3 = sc.parallelize(List(2, 7))
union 聯集運算
intRDD1.union(intRDD2).union(intRDD3).collect()
(intRDD1 ++ intRDD2++ intRDD3).collect()
intersection 交集運算
intRDD1.intersection(intRDD2).collect()
subtract 差集運算
intRDD1.subtract(intRDD2).collect()
cartesian笛卡兒乘積運算
intRDD1.cartesian(intRDD2).collect()

統計功能

命令
說明
intRDD.stats
統計
intRDD.min
最小
intRDD.max
最大
intRDD.stdev
標準差
intRDD.count
數量
intRDD.sum
加總
intRDD.mean
平均

此圖出自Spark官網 https://spark.apache.org/


以上內容節錄自這本書,很適合Python程式設計師學習Spark機器學習與大數據架構,點選下列連結查看本書詳細介紹:
  Python+Spark 2.0+Hadoop機器學習與大數據分析實戰
  http://pythonsparkhadoop.blogspot.tw/2016/10/pythonspark-20hadoop.html

《購買本書 限時特價專區》
博客來網路書店: http://www.books.com.tw/products/0010730134?loc=P_007_090

天瓏網路書店: https://www.tenlong.com.tw/items/9864341537?item_id=1023658
  

露天拍賣:http://goods.ruten.com.tw/item/show?21640846068139
蝦皮拍賣:https://goo.gl/IEx13P 



沒有留言:

張貼留言