使用 Spark Shell

基礎

Spark 的 shell 作為一個强大的交互式數據分析工具,提供了一個簡單的方式來學習 API。它可以使用 Scala(在 Java 虛擬機上運行現有的 Java 庫的一个很好方式) 或 Python。在 Spark 目錄裡使用下面的方式開始運行:

./bin/spark-shell

Spark 最主要的抽象是叫Resilient Distributed Dataset(RDD) 的彈性分布式資料集。RDDs 可以使用 Hadoop InputFormats(例如 HDFS 文件)創建,也可以從其他的 RDDs 轉換。讓我們在 Spark 源代碼目錄從 README 文本文件中創建一個新的 RDD。

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDD 的 actions 從 RDD 中返回值,transformations 可以轉換成一個新 RDD 並返回它的引用。讓我們開始使用幾個操作:

scala> textFile.count() // RDD 的數據行數
res0: Long = 126

scala> textFile.first() // RDD 的第一行數據
res1: String = # Apache Spark

現在讓我們使用一個 transformation,我們將使用 filter 在這個文件裡返回一個包含子數據集的新 RDD。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

我們可以把 actions 和 transformations 連接在一起:

scala> textFile.filter(line => line.contains("Spark")).count() // 有多少行包括 "Spark"?
res3: Long = 15

更多 RDD 操作

RDD actions 和 transformations 能被用在更多的複雜計算中。比方說,我們想要找到一行中最多的單詞數量:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

首先將行映射(map)成一個整數產生一個新 RDD。 在這個新的 RDD 上調用 reduce 找到行中最大的個數。 mapreduce 的參數是 Scala 的函數串(closures),並且可以使用任何語言特性或者 Scala/Java 函式庫。例如,我们可以很方便地調用其他的函數。 我們使用 Math.max() 函數讓代碼更容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

Hadoop 流行的一個通用的數據流(data flow)模式是 MapReduce。Spark 能很容易地實現 MapReduce:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

這裡,我們結合 flatMap, mapreduceByKey 來計算文件裡每個單詞出現的數量,它的結果是包含一組(String, Int) 鍵值對的 RDD。我们可以使用 [collect] 操作在我們的 shell 中收集單詞的數量:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

暫存(Caching)

Spark 支持把數據集拉到集群內的暫存記憶體中。當要重複取用資料時非常有用.例如當我们在一個小的熱(hot)數據集中查詢,或者運行一個像網頁搜索排序這樣的迭代演算法。作為一個簡單的例子,讓我们把 linesWithSpark 數據集標記在暫存中:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

暫存 100 行的文本文件來研究 Spark 這看起来很傻。真正讓人感興趣的部分是我们可以在非常大型的數據集中使用同樣的函數,甚至在 10 個或者 100 個節點中交叉計算。你同樣可以使用 bin/spark-shell 連接到一个 cluster 來繼亂掉编程指南中的方法進行交互操作。

results matching ""

    No results matching ""