Skip to main content

spark RDD

Parallelize

在 Spark Shell 中使用 sc.parallelize

scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at
:24

在 Scala 中使用 sparkContext.parallelize

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object RDDParallelize {

def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession.builder().master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val rdd:RDD[Int] = spark.sparkContext.parallelize(List(1,2,3,4,5))
val rddCollect:Array[Int] = rdd.collect()
println("Number of Partitions: "+rdd.getNumPartitions)
println("Action: First element: "+rdd.first())
println("Action: RDD converted to Array[Int] : ")
rddCollect.foreach(println)
}
}

执行结果如下:

Number of Partitions: 1
Action: First element: 1
Action: RDD converted to Array[Int] :
1
2
3
4
5

创建空 RDD

sparkContext.parallelize(Seq.empty[String])

读取 txt 文件

Spark core 提供 textFile()wholeTextFiles() 两个方法来读取单个或多个文件。

  • textFile() - 读取单个或多个文件至 RDD
  • wholeTextFiles() - 同样读取单个或多个文件至 RDD,相比较 textFile(),还包含了所读取文件的文件信息

数据准备

文件名称文件内容
text01.txtOne,1
text02.txtTwo,2
text03.txtThree,3
text04.txtFour,4
invalid.txtInvalid,I

在本例中,测试数据位于 Idea 项目的根路径 datas 目录下,完整路径为 datas/files/text01.txt

读取一个目录中的所有文件

val rdd = sc.textFile("datas/files/*")
rdd.foreach(f=>{
println(f)
})

执行结果:

One,1s
Invalid,Is
Four,4
Two,2
Three,3
caution

注意读取的目录中不能有嵌套目录。

info

如果程序运行在集群中,打印数据结果之前必须执行 collect 操作:

rdd.collect.foreach(f=>{
println(f)
})

下面再看看使用 wholeTextFiles() 方法读取目录文件:

val rddWhole = sc.wholeTextFiles("datas/files/*")
rddWhole.foreach(f=>{
println(f._1+"=>"+f._2)
})

执行结果如下:

file:/home/spark/Projects/spark-learning/datas/files/text02.txt=>Two,2
file:/home/spark/Projects/spark-learning/datas/files/invalid.txt=>Invalid,Is
file:/home/spark/Projects/spark-learning/datas/files/text01.txt=>One,1s
file:/home/spark/Projects/spark-learning/datas/files/text04.txt=>Four,4
file:/home/spark/Projects/spark-learning/datas/files/text03.txt=>Three,3

可以看到 wholeTextFiles() 方法返回的结果是一个元组,同时包含了文件的路径信息和文件内容。

读取多个文件

如果明确文件名称,可以同时读取多个文件:

val rdd3 = sc.textFile("datas/files/text01.txt,datas/files/text02.txt")
rdd3.foreach(f=>{
println(f)
})
Two,2
One,1s

通过文件名模式匹配读取文件

val rdd2 = sc.textFile("datas/files/text*.txt")
rdd2.foreach(f=>{
println(f)
})
One,1s
Four,4
Three,3
Two,2ss

读取 CSV 文件

数据准备

文件名文件内容
text01.csvCol1,Col2
text02.csvCol1,Col2
text03.csvCol1,Col2
text04.csvCol1,Col2
invalid.csvCol1,Col2

加载 CSV 文件

由于 Spark Core 本身并没有支持直接读取 CSV 文件的方法,所以需要先使用 textFile() 方法返回 RDD[String] 格式数据,然后通过 split() 方法将每一行根据分隔符进行划分,最终得到 RDD[Array[String]] 类型数据;例子如下:

val rddFromFile = sc.textFile("datas/csv/text01.csv")

val rdd = rddFromFile.map(f=>{
f.split(",")
})

rdd.collect().foreach(f=>{
println("Col1:"+f(0)+",Col2:"+f(1))
})
Col1:Col1,Col2:Col2
Col1:One,Col2:1
Col1:Eleven,Col2:11

创建 RDD

Resilient Distributed Datasets (RDD) Spark 中的基础数据结构,在实际使用中,我们可以通过多种方式创建 RDD 数据。

通过列表创建

val rdd=spark.sparkContext.parallelize(Seq(("Java", 20000), 
("Python", 100000), ("Scala", 3000)))
rdd.foreach(println)
(Python,100000)
(Scala,3000)
(Java,20000)

通过文件创建

val rdd = spark.sparkContext.textFile("/path/textFile.txt")

val rdd2 = spark.sparkContext.wholeTextFiles("/path/textFile.txt")
rdd2.foreach(record=>println("FileName : "+record._1+", FileContents :"+record._2))

通过其它 RDD 创建

val rdd3 = rdd.map(row=>{(row._1,row._2+100)})

通过 DataFrames 或者 DataSet 创建

val myRdd2 = spark.range(20).toDF().rdd

RDD 转换

由于 RDD 具有不可更改的属性,所以对 RDD 的转换操作不会修改原始数据,而是生成新的 RDD,我们称之为

RDD lineage

转换方法归纳

方法使用说明
cache()缓存 RDD
filter()过滤数据,返回新的 RDD
flatMap()Returns flattern map meaning if you have a dataset with array, it converts each elements in a array as a row. In other words it return 0 or more items in output for each element in dataset.
map()遍历数据,对每一行数据进行处理后返回新的 RDD
mapPartitions()Similar to map, but executs transformation function on each partition, This gives better performance than map function
mapPartitionsWithIndex()Similar to map Partitions, but also provides func with an integer value representing the index of the partition.
randomSplit()Splits the RDD by the weights specified in the argument. For example rdd.randomSplit(0.7,0.3)
union()Comines elements from source dataset and the argument and returns combined dataset. This is similar to union function in Math set operations.
sample()Returns the sample dataset.
intersection()Returns the dataset which contains elements in both source dataset and an argument
distinct()数据去重.
repartition()Return a dataset with number of partition specified in the argument. This operation reshuffles the RDD randamly, It could either return lesser or more partioned RDD based on the input supplied.
coalesce()Similar to repartition by operates better when we want to the decrease the partitions. Betterment acheives by reshuffling the data from fewer nodes compared with all nodes by repartition.

字数统计案例

数据准备

这里准备一个包含多行数据的 txt 文件,而转换操作的最终目的是统计文件中以 a 开头的单词出现的次数。

Project Gutenberg’s
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
Alice’s Adventures in Wonderland
by Lewis Carroll
This eBook is for the use
of anyone anywhere
at no cost and with
This eBook is for the use
of anyone anywhere
at no cost and with
Project Gutenberg’s
Alice’s Adventures in Wonderland
...

加载文件

val rdd:RDD[String] = sc.textFile("datas/test.txt")

flatmap()

val rdd2 = rdd.flatMap(f=>f.split(" "))

该方法首先将每一行数据通过空格划分为单词数组,然后将数组打平,最终得到每个单词为一行的新 RDD 数据:

Project
by
Gutenberg’s
Lewis
Carroll
Alice’s
Adventures
in
Wonderland
This
eBook
is
by
for
Lewis
the
...

map()

val rdd3:RDD[(String,Int)]= rdd2.map(m=>(m,1))

使用 map() 方法将每一行的单词转换为元组,值设为 1 ,表明该单词出现 1 次。

(by,1)
(Lewis,1)
(Project,1)
(Carroll,1)
(Gutenberg’s,1)
(This,1)
(eBook,1)
(Alice’s,1)
(is,1)
(Adventures,1)
...

filter()

val rdd4 = rdd3.filter(a=> a._1.startsWith("a"))

将以 a 开头的单词过滤出来。

(anyone,1)
(anyone,1)
(anywhere,1)
(anywhere,1)
(at,1)
(at,1)
(and,1)
(and,1)
(anyone,1)
(anyone,1)
(anywhere,1)
(anywhere,1)
...

reduceByKey()

val rdd5 = rdd3.reduceByKey(_ + _)

根据元组的 key 合并数据,将相同 key 的值进行 + 操作。

(anyone,27)
(at,27)
(and,27)
(anywhere,27)

sortByKey()

val rdd6 = rdd5.map(a=>(a._2,a._1)).sortByKey()

根据每个单词出现次数排序。

RDD Actions

数据准备

val inputRDD = sc.parallelize(List(("Z", 1), ("A", 20), ("B", 30), ("C", 40), ("B", 30), ("B", 60)))
val listRdd = sc.parallelize(List(1, 2, 3, 4, 5, 3, 2))

aggregate - action

def param0= (accu:Int, v:Int) => accu + v
def param1= (accu1:Int,accu2:Int) => accu1 + accu2
println("aggregate : "+listRdd.aggregate(0)(param0,param1))

def param3= (accu:Int, v:(String,Int)) => accu + v._2
def param4= (accu1:Int,accu2:Int) => accu1 + accu2
println("aggregate : "+inputRDD.aggregate(0)(param3,param4))

其中,param0 参数用于分区内的聚合计算,param1 用于分区之间的聚合计算。

aggregate : 20
aggregate : 181

fold - action

println("fold :  " + listRdd.fold(0){ (acc,v) =>
val sum = acc+v
sum
})

println("fold : " + inputRDD.fold(("Total", 0)) { (acc: (String, Int), v: (String, Int)) =>
val sum = acc._2 + v._2
("Total", sum)
})
fold :  20
fold : (Total,181)

reduce

//reduce
println("reduce : "+listRdd.reduce(_ + _))
//Output: reduce : 20
println("reduce alternate : "+listRdd.reduce((x, y) => x + y))
//Output: reduce alternate : 20
println("reduce : "+inputRDD.reduce((x, y) => ("Total",x._2 + y._2)))
//Output: reduce : (Total,181)

collect

返回 RDD 数据的数组类型。

//Collect
val data:Array[Int] = listRdd.collect()
data.foreach(println)

count - action

返回 RDD 数据总数。

//count, countApprox, countApproxDistinct
println("Count : "+listRdd.count)
//Output: Count : 7

first

返回 RDD 第一个元素。

println("first :  "+listRdd.first())
//Output: first : 1
println("first : "+inputRDD.first())
//Output: first : (Z,1)

top

//top
println("top : "+listRdd.top(2).mkString(","))
//Output: take : 5,4
println("top : "+inputRDD.top(2).mkString(","))
//Output: take : (Z,1),(C,40)

min

//min
println("min : "+listRdd.min())
//Output: min : 1
println("min : "+inputRDD.min())
//Output: min : (A,20)

max

//max
println("max : "+listRdd.max())
//Output: max : 5
println("max : "+inputRDD.max())
//Output: max : (Z,1)

take

println("take : "+listRdd.take(2).mkString(","))
//Output: take : 1,2

RDD 转换为 DataFrame

准备数据

import spark.implicits._

val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)

toDF

val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)

toDF() 可指定列名:

val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()
root
|-- language: string (nullable = true)
|-- users_count: string (nullable = true)

createDataFrame

val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
dfFromRDD2.printSchema()
root
|-- language: string (nullable = true)
|-- users_count: string (nullable = true)