Spark的核心是RDD(Resilient Distributed Dataset)彈性分散式資料集,是由AMPLab實驗室所提出的概念,屬於一種分散式的記憶體。Spark主要優勢是來自RDD本身的特性。RDD能與其他系統相容,可以匯入外部儲存系統的資料集,例如:HDFS、HBase或其他Hadoop 資料來源。
RDD的三種基本運算
在RDD之上,可以施加三種類型的運算:
RDD運算類型
|
說明
|
「轉換」運算Transformation
|
|
「動作」運算Action
|
|
「持久化」Persistence
|
|
Lineage機制具備容錯的特性
RDD透過「轉換(Transformation)」運算可以得出新的RDD,但Spark會延遲這個「轉換」動作的發生時間點。它並不會馬上執行,而是等到執行了Action之後,才會基於所有的RDD關係來執行轉換。
如上圖所示:
輸入資料,執行「轉換1」運算產生RDD1,此時不會實際執行,只記錄操作命令
- RDD1 執行「轉換2」運算產生RDD2,此時不會實際執行,只記錄操作命令
- RDD2 執行「轉換3」運算產生RDD3,此時不會實際執行,只記錄操作命令
- RDD2 執行「轉換4」運算產生RDD4,此時不會實際執行,只記錄操作命令
- RDD3 執行「動作1」運算,此時會實際執行:「轉換1」+「轉換2」+「轉換3」+「動作1」,產生輸出資料1
- RDD5執行「動作2」運算,此時會實際執行:「轉換1」+「轉換2」+「轉換4」+「動作2」,產生輸出資料2
(如果你之前已經先執行「動作1」運算,所以「轉換1」+「轉換2」已經實際執行完成,在此只會實際執行「轉換4」+「動作2」,節省執行時間)
RDD本身Lineage機制,它會記錄每個RDD與其父代RDD之間的關聯,他會紀錄透過什麼操作,才由父代RDD得到該RDD的資訊。
RDD本身immutable不可變的特性,再加上Lineage機制,使得Spark具備容錯的特性。如果某節點機器故障,儲存於節點上的RDD損毀,能重新執行一連串的「轉換」指令,產生新的輸出資料,如此就可以避免因為特定節點故障,造成整個系統無法運作的問題。
9.2 基本RDD「轉換」運算
Step2 建立intRDD
val intRDD = sc.parallelize(List(3,1, 2, 5, 5)) intRDD.collect()Step3 建立stringRDD
val stringRDD = sc.parallelize(List("Apple", "Orange", "Banana","Grape","Apple")) stringRDD.collect()Step4 map運算
具名函數的寫法:
def addOne(x :Int):Int={ return (x+1) } intRDD.map(addOne).collect()匿名函數的寫法:
intRDD.map(x => x + 1).collect()匿名函數+匿名參數的寫法:
intRDD.map(_ + 1).collect()Step7 filter字串運算
stringRDD.filter(x =>x.contains("ra") ).collect()Step9 randomSplit運算
val sRDD = intRDD.randomSplit(Array(0.4,0.6)) sRDD(0).collect() sRDD(1).collect()Step10 groupBy運算
val gRDD=intRDD.groupBy( x =>{ if (x % 2 == 0) "even" else "odd"} ).collect gRDD(0) gRDD(1)9.3 多個RDD「轉換」運算
Step1 建立3個範例RDD
val intRDD1 = sc.parallelize(List(3, 1, 2, 5, 5)) val intRDD2 = sc.parallelize(List(5, 6)) val intRDD3 = sc.parallelize(List(2, 7))Step2 union 聯集運算
intRDD1.union(intRDD2).union(intRDD3).collect() (intRDD1 ++ intRDD2++ intRDD3).collect()Step3 intersection 交集運算
intRDD1.intersection(intRDD2).collect()Step4 subtract 差集運算
intRDD1.subtract(intRDD2).collect()Step5 cartesian笛卡兒乘積運算
intRDD1.cartesian(intRDD2).collect()9.4 基本「動作」運算
Step1 讀取元素
intRDD.first intRDD.take(2) intRDD.takeOrdered(3) intRDD.takeOrdered(3)(Ordering[Int].reverse)Step2 統計功能
intRDD.stats intRDD.min intRDD.max intRDD.stdev intRDD.count intRDD.sum intRDD.mean9.5 RDD Key-Value 基本「轉換」運算
Step1 讀取元素
val kvRDD1 = sc.parallelize(List((3, 4), (3, 6), (5, 6), (1, 2)))Step2 列出keys值
kvRDD1.keys.collect()Step3 列出values值
kvRDD1.values.collect()Step4 使用filter篩選key運算
kvRDD1.filter { case (key, value) => key < 5 }.collectStep5 使用filter篩選value運算
kvRDD1.filter { case (key, value) => value < 5 }.collectStep6 mapValues運算
kvRDD1.mapValues(x => x * x).collectStep7 sortByKey由小至大依照key排序
kvRDD1.sortByKey(true).collect() kvRDD1.sortByKey().collect()Step8 sortByKey由大至小依照key排序
kvRDD1.sortByKey(false).collect()Step9 reduceByKey
kvRDD1.reduceByKey((x,y)=>x+y).collect kvRDD1.reduceByKey(_+_).collect9.6 多個RDD Key-Value「轉換」運算
Step1 Key-Value RDD範例
val kvRDD1 = sc.parallelize(List((3, 4), (3, 6), (5, 6), (1, 2))) val kvRDD2 = sc.parallelize(List((3, 8)))Step2 Key-Value RDD join運算
kvRDD1.join(kvRDD2).foreach(println)Step2 Key-Value RDD join運算
kvRDD1.join(kvRDD2).foreach(println)Step3 Key-Value leftOuterJoin運算
kvRDD1.leftOuterJoin(kvRDD2).foreach(println)Step4 Key-Value RDD rightOuterJoin運算
kvRDD1.rightOuterJoin(kvRDD2).foreach(println)Step5 Key-Value subtractByKey運算
kvRDD1.subtractByKey(kvRDD2).collect9.7 Key-Value 「動作」運算
Step1 Key-Value first運算
kvRDD1.first() kvRDD1.take(2)Step2 取得第1筆資料的元素
val kvFirst=kvRDD1.first kvFirst._1 kvFirst._2Step3 Key-Value countByKey運算
kvRDD1.countByKey()Step4 Key-Value collectAsMap運算
var KV=kvRDD1.collectAsMap()Step5 使用對照表轉換資料
KV(3) KV(4)Step6 Key-Value lookup運算
kvRDD1.lookup(3) kvRDD1.lookup(5)9.8 Broadcast 廣播變數
Step1 不使用Broadcast 廣播變數的範例
val kvFruit = sc.parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape"))) val fruitMap=kvFruit.collectAsMap() val fruitIds=sc.parallelize(List(2,4,1,3)) val fruitNames= fruitIds.map(x=>fruitMap(x)).collectStep2 使用Broadcast 廣播變數的範例
val kvFruit = sc.parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape"))) val fruitMap=kvFruit.collectAsMap() val bcFruitMap=sc.broadcast(fruitMap) val fruitIds=sc.parallelize(List(2,4,1,3)) val fruitNames= fruitIds.map(x=>bcFruitMap.value(x)).collect9.9 accumulator累加器
Step1 不使用Broadcast 廣播變數的範例
val intRDD = sc.parallelize(List(3,1, 2, 5, 5)) val total = sc.accumulator(0.0) val num = sc.accumulator(0) intRDD.foreach(i => { total += i num += 1 }) println("total="+total.value+", num="+num.value) val avg = total.value / num.value9.10 RDD Persistence持久化
Step2 RDD.persist()範例
val intRddMemory = sc.parallelize(List(3,1, 2, 5, 5)) intRddMemory.persist() intRddMemory.unpersist()Step3 RDD.persist設定儲存等級範例
import org.apache.spark.storage.StorageLevel val intRddMemoryAndDisk = sc.parallelize(List(3,1, 2, 5, 5)) intRddMemoryAndDisk.persist(StorageLevel.MEMORY_AND_DISK) intRddMemoryAndDisk.unpersist()9.11 使用Spark 建立WordCount
Step1 建立測試檔案
cd ~/workspace/WordCount/data gedit test.txt
Apple Apple Orange Banana Grape GrapeStep2 進入spark-shell
spark-shellStep3 執行WordCount spark命令
val textFile = sc.textFile("file:/home/hduser/workspace/WordCount/data/test.txt") val stringRDD=textFile.flatMap(line => line.split(" ")) val countsRDD = stringRDD.map(word => (word, 1)).reduceByKey(_ + _) countsRDD.saveAsTextFile("file:/home/hduser/workspace/WordCount/data/output") exitStep4 查看data目錄
llStep5 查看output目錄
cd output llStep6 查看part-00000輸出檔案
cat part-000009.12 Spark WordCount 詳細解說
val textFile = sc.textFile("file:/home/hduser/workspace/WordCount/data/test.txt") val stringRDD=textFile. flatMap(line => line.split(" ")) val countsRDD = stringRDD. map(word => (word, 1)). reduceByKey(_ + _) countsRDD.saveAsTextFile("file:/home/hduser/workspace/WordCount/data/output")Step1 sc.textFile讀取檔案
val textFile = sc.textFile("file:/home/hduser/workspace/WordCount/data/test.txt") textFile.foreach(println)Step2 flatMap取出每一個字
val stringRDD=textFile.flatMap(line => line.split(" ")) stringRDD.collectStep3 flatMap與map的差異
textFile.map(line =>line.split(" ")).collect textFile.flatMap(line => line.split(" ")).collect
想要了解看更多有關Spark RDD資訊,書中有詳細介紹
http://www.books.com.tw/products/0010695285?loc=P_012_0_201
此圖出自Spark官網 https://spark.apache.org/
0 意見:
張貼留言