Apache Spark是In-Memory大數據資料運算框架,Spark的核心是RDD,Spark主要優勢是來自RDD本身的特性,要運用Spark一定要先了解RDD。
Spark的核心是RDD(Resilient Distributed Dataset)彈性分散式資料集,是由AMPLab實驗室所提出的概念,屬於一種分散式的記憶體。RDD能與其他系統相容,可以匯入外部儲存系統的資料集,例如:HDFS、HBase或其他Hadoop 資料來源。
RDD的三種基本運算
在RDD之上,可以施加三種類型的運算:
RDD運算類型
|
說明
|
「轉換」運算Transformation
|
|
「動作」運算Action
|
|
「持久化」Persistence
|
|
Lineage機制具備容錯的特性
RDD透過「轉換(Transformation)」運算可以得出新的RDD,但Spark會延遲這個「轉換」動作的發生時間點。它並不會馬上執行,而是等到執行了Action之後,才會基於所有的RDD關係來執行轉換。
如上圖所示:
輸入資料,執行「轉換1」運算產生RDD1,此時不會實際執行,只記錄操作命令
- RDD1 執行「轉換2」運算產生RDD2,此時不會實際執行,只記錄操作命令
- RDD2 執行「轉換3」運算產生RDD3,此時不會實際執行,只記錄操作命令
- RDD2 執行「轉換4」運算產生RDD4,此時不會實際執行,只記錄操作命令
- RDD3 執行「動作1」運算,此時會實際執行:「轉換1」+「轉換2」+「轉換3」+「動作1」,產生輸出資料1
- RDD5執行「動作2」運算,此時會實際執行:「轉換1」+「轉換2」+「轉換4」+「動作2」,產生輸出資料2
(如果你之前已經先執行「動作1」運算,所以「轉換1」+「轉換2」已經實際執行完成,在此只會實際執行「轉換4」+「動作2」,節省執行時間)
RDD本身Lineage機制,它會記錄每個RDD與其父代RDD之間的關聯,他會紀錄透過什麼操作,才由父代RDD得到該RDD的資訊。
RDD本身immutable不可變的特性,再加上Lineage機制,使得Spark具備容錯的特性。如果某節點機器故障,儲存於節點上的RDD損毀,能重新執行一連串的「轉換」指令,產生新的輸出資料,如此就可以避免因為特定節點故障,造成整個系統無法運作的問題。
建立RDD
建立intRDD
建立RDD最簡單的方式,就是使用SparkContext 的parallelize 方法,如下列指令:
val intRDD = sc.parallelize(List(3,1, 2, 5, 5)) intRDD.collect()先使用val定義intRDD,然後使用parallelize方法輸入一個List的參數,以建立intRDD。不過這也是一個「轉換」運算,所以不會馬上實際執行。 必須等執行collect()後,會轉換為Array。這是一個「動作」運算所以會立刻執行
建立stringRDD
parallelize方法除了可以建立Int的RDD,也可建立String的RDD,如下列指令
val stringRDD = sc.parallelize(List("Apple", "Orange", "Banana","Grape","Apple")) stringRDD.collect()Map運算
map運算可以透過傳入的函數,將每一個元素經過函數運算產生另外一個RDD。如下圖所示,RDD透過傳入的函數addOne,將每一個元素加1而產生另外一個RDD。
在spark的map運算中,可以使用下列3種語法:
具名函數的寫法:
1.先定義addOne函數並傳入參數x,此函數會將x加1再回傳。
2.然後將函數名稱addOne做為參數傳入map命令,map命令會將每一個元素加1,來產生另外一個RDD。
3.不過因為map是一個「轉換」運算,所以不會馬上執行。為了方便示範,所以我們加上collect()這個「動作」運算立即執行。
4.具名函數的語法固然可以達成我們的目的,可是這種語法很麻煩。
def addOne(x :Int):Int={ return (x+1) } intRDD.map(addOne).collect()匿名函數的寫法:
map運算可以使用更簡單的語法。如下列指令,map會傳入(x=>x+1)做為參數,這是lambda 語法的anonymous functions匿名函數。其中x是傳入參數,x+1是要執行的命令,告訴map運算每一個元素都要加1。匿名函數的語法命令簡潔多了,而且讓程式碼更易讀。
intRDD.map(x => x + 1).collect()匿名函數+匿名參數的寫法:
還有更簡單的語法,使用匿名參數時可以用底線 _ 來取代參數x=>x 。這種語法固 然更簡潔,但是可能很多讀者會不習慣這種語法,可能會影響程式碼的理解,可以 視個人喜好使用。
intRDD.map(_ + 1).collect()執行後如下畫面,這3種語法執行結果完全相同,都是將原本(3,1, 2, 5, 5) 每一個數字都加1變成(4, 2, 3, 6, 6)。
filter字串運算
filter可以讓你對RDD內每一個元素進行篩選,並且產生另外的RDD。 例如篩選內含ra的字串
stringRDD.filter(x =>x.contains("ra") ).collect()多個RDD「轉換」運算
建立3個範例RDD
val intRDD1 = sc.parallelize(List(3, 1, 2, 5, 5)) val intRDD2 = sc.parallelize(List(5, 6)) val intRDD3 = sc.parallelize(List(2, 7))union 聯集運算
intRDD1.union(intRDD2).union(intRDD3).collect() (intRDD1 ++ intRDD2++ intRDD3).collect()intersection 交集運算
intRDD1.intersection(intRDD2).collect()subtract 差集運算
intRDD1.subtract(intRDD2).collect()cartesian笛卡兒乘積運算
intRDD1.cartesian(intRDD2).collect()
統計功能
命令
|
說明
|
intRDD.stats
|
統計
|
intRDD.min
|
最小
|
intRDD.max
|
最大
|
intRDD.stdev
|
標準差
|
intRDD.count
|
數量
|
intRDD.sum
|
加總
|
intRDD.mean
|
平均
|
以上內容節錄自這本書,很適合Python程式設計師學習Spark機器學習與大數據架構,點選下列連結查看本書詳細介紹:
Python+Spark 2.0+Hadoop機器學習與大數據分析實戰
http://pythonsparkhadoop.blogspot.tw/2016/10/pythonspark-20hadoop.html
博客來網路書店: http://www.books.com.tw/products/0010730134?loc=P_007_090
天瓏網路書店: https://www.tenlong.com.tw/items/9864341537?item_id=1023658
露天拍賣:http://goods.ruten.com.tw/item/show?21640846068139
蝦皮拍賣:https://goo.gl/IEx13P
沒有留言:
張貼留言