spark SQL 基本使用
创建 DataFrame
在 Spark 中,创建 DataFrame 的主要方法是 createDataFrame()
和 toDF()
。
数据准备
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val columns = Seq("language","users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
通过 RDD 创建 DataFrame
toDF()
通过 toDF()
方法,可以直接将 RDD 转换为 DataFrame,默认情况下,它会将列命名为 “_1” 和 “_2” 。(此处有两列)
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
此外,toDF() 还可以直接指定列名:
val dfFromRDD1 = rdd.toDF("language","users_count")
dfFromRDD1.printSchema()
root
|-- language: string (nullable = true)
|-- users_count: string (nullable = true)
tip
默认情况下,列的类型为 String ;如果需要指定列的数据类型,则需要使用 Schema 。
createDataFrame()
val dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
createDataFrame()
方法支持 RDD[Row] 数据类型,在使用的时候需要定义 Schema:
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.Row
val schema = StructType(Array(
StructField("language", StringType, nullable = true),
StructField("users_count", StringType, nullable = true)
))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2))
val dfFromRDD3 = spark.createDataFrame(rowRDD, schema)
通过 List 和 Seq 集合创建 DataFrame
toDF()
import spark.implicits._
val dfFromData1 = data.toDF()
createDataFrame()
//From Data (USING createDataFrame)
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)
import scala.collection.JavaConversions._
//From Data (USING createDataFrame and Adding schema using StructType)
val schema = StructType(Array(
StructField("language", StringType, nullable = true),
StructField("users_count", StringType, nullable = true)
))
val rowData= Seq(Row("Java", "20000"),
Row("Python", "100000"),
Row("Scala", "3000"))
var dfFromData3 = spark.createDataFrame(rowData,schema)
通过 CSV 创建 DataFrame
val df2 = spark.read.csv("/src/resources/file.csv")
通过 TXT 创建 DataFrame
val df2 = spark.read.text("/src/resources/file.txt")
通过 JSON 创建 DataFrame
val df2 = spark.read.json("/src/resources/file.json")
通过 XML 创建 DataFrame
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.11</artifactId>
<version>0.6.0</version>
</dependency>
val df = spark.read
.format("com.databricks.spark.xml")
.option("rowTag", "person")
.xml("src/main/resources/persons.xml")
通过 MySQL 创建 DataFrame
val df_mysql = spark.read.format(“jdbc”)
.option(“url”, “jdbc:mysql://localhost:port/db”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
选择列
数据准备
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val data = Seq(("James", "Smith", "USA", "CA"),
("Michael", "Rose", "USA", "NY"),
("Robert", "Williams", "USA", "CA"),
("Maria", "Jones", "USA", "FL"))
val columns = Seq("firstname", "lastname", "country", "state")
val df = data.toDF(columns: _*)
df.show(false)
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James |Smith |USA |CA |
|Michael |Rose |USA |NY |
|Robert |Williams|USA |CA |
|Maria |Jones |USA |FL |
+---------+--------+-------+-----+
选择单列或多列
df.select("firstname", "lastname").show()
df.select(df("firstname"), df("lastname")).show()
//Using col function, use alias() to get alias name
import org.apache.spark.sql.functions.col
df.select(col("firstname").alias("fname"), col("lastname")).show()
+---------+--------+
|firstname|lastname|
+---------+--------+
| James| Smith|
| Michael| Rose|
| Robert|Williams|
| Maria| Jones|
+---------+--------+
+---------+--------+
|firstname|lastname|
+---------+--------+
| James| Smith|
| Michael| Rose|
| Robert|Williams|
| Maria| Jones|
+---------+--------+
+-------+--------+
| fname|lastname|
+-------+--------+
| James| Smith|
|Michael| Rose|
| Robert|Williams|
| Maria| Jones|
+-------+--------+
选择所有列
//Show all columns from DataFrame
df.select("*").show()
val columnsAll = df.columns.map(m => col(m))
df.select(columnsAll: _*).show()
df.select(columns.map(m => col(m)): _*).show()
+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
| James| Smith| USA| CA|
| Michael| Rose| USA| NY|
| Robert|Williams| USA| CA|
| Maria| Jones| USA| FL|
+---------+--------+-------+-----+
根据列表选择列
val listCols = List("lastname", "country")
df.select(listCols.map(m => col(m)): _*).show()
+--------+-------+
|lastname|country|
+--------+-------+
| Smith| USA|
| Rose| USA|
|Williams| USA|
| Jones| USA|
+--------+-------+
选择前 N 列
//Select first 3 columns.
df.select(df.columns.slice(0, 3).map(m => col(m)): _*).show()
+---------+--------+-------+
|firstname|lastname|country|
+---------+--------+-------+
| James| Smith| USA|
| Michael| Rose| USA|
| Robert|Williams| USA|
| Maria| Jones| USA|
+---------+--------+-------+
根据位置或索引选择列
//Selects 4th column (index starts from zero)
df.select(df.columns(3)).show()
//Selects columns from index 2 to 4
df.select(df.columns.slice(2, 4).map(m => col(m)): _*).show()
+-----+
|state|
+-----+
| CA|
| NY|
| CA|
| FL|
+-----+
+-------+-----+
|country|state|
+-------+-----+
| USA| CA|
| USA| NY|
| USA| CA|
| USA| FL|
+-------+-----+
通过正则选择列
//Select columns by regular expression
df.select(df.colRegex("`^.*name*`")).show()
+---------+--------+
|firstname|lastname|
+---------+--------+
| James| Smith|
| Michael| Rose|
| Robert|Williams|
| Maria| Jones|
+---------+--------+
根据 starts 或者 ends 选择列
df.select(df.columns.filter(f => f.startsWith("first")).map(m => col(m)): _*)
df.select(df.columns.filter(f => f.endsWith("name")).map(m => col(m)): _*)
+---------+
|firstname|
+---------+
| James|
| Michael|
| Robert|
| Maria|
+---------+
+---------+--------+
|firstname|lastname|
+---------+--------+
| James| Smith|
| Michael| Rose|
| Robert|Williams|
| Maria| Jones|
+---------+--------+
选择多层级列
//Show Nested columns
val data2 = Seq(Row(Row("James", "", "Smith"), "OH", "M"),
Row(Row("Anna", "Rose", ""), "NY", "F"),
Row(Row("Julia", "", "Williams"), "OH", "F"),
Row(Row("Maria", "Anne", "Jones"), "NY", "M"),
Row(Row("Jen", "Mary", "Brown"), "NY", "M"),
Row(Row("Mike", "Mary", "Williams"), "OH", "M")
)
val schema = new StructType()
.add("name", new StructType()
.add("firstname", StringType)
.add("middlename", StringType)
.add("lastname", StringType))
.add("state", StringType)
.add("gender", StringType)
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(data2), schema)
df2.printSchema()
df2.show(false)
df2.select("name").show(false)
df2.select("name.firstname", "name.lastname").show(false)
df2.select("name.*").show(false)
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
+----------------------+-----+------+
|name |state|gender|
+----------------------+-----+------+
|[James, , Smith] |OH |M |
|[Anna, Rose, ] |NY |F |
|[Julia, , Williams] |OH |F |
|[Maria, Anne, Jones] |NY |M |
|[Jen, Mary, Brown] |NY |M |
|[Mike, Mary, Williams]|OH |M |
+----------------------+-----+------+
+----------------------+
|name |
+----------------------+
|[James, , Smith] |
|[Anna, Rose, ] |
|[Julia, , Williams] |
|[Maria, Anne, Jones] |
|[Jen, Mary, Brown] |
|[Mike, Mary, Williams]|
+----------------------+
+---------+--------+
|firstname|lastname|
+---------+--------+
|James |Smith |
|Anna | |
|Julia |Williams|
|Maria |Jones |
|Jen |Brown |
|Mike |Williams|
+---------+--------+
+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James | |Smith |
|Anna |Rose | |
|Julia | |Williams|
|Maria |Anne |Jones |
|Jen |Mary |Brown |
|Mike |Mary |Williams|
+---------+----------+--------+
创建或修改列
数据准备
val arrayStructureData = Seq(
Row(Row("James ", "", "Smith"), "1", "M", 3100, List("Cricket", "Movies"), Map("hair" -> "black", "eye" -> "brown")),
Row(Row("Michael ", "Rose", ""), "2", "M", 3100, List("Tennis"), Map("hair" -> "brown", "eye" -> "black")),
Row(Row("Robert ", "", "Williams"), "3", "M", 3100, List("Cooking", "Football"), Map("hair" -> "red", "eye" -> "gray")),
Row(Row("Maria ", "Anne", "Jones"), "4", "M", 3100, null, Map("hair" -> "blond", "eye" -> "red")),
Row(Row("Jen", "Mary", "Brown"), "5", "M", 3100, List("Blogging"), Map("white" -> "black", "eye" -> "black"))
)
val arrayStructureSchema = new StructType()
.add("name", new StructType()
.add("firstname", StringType)
.add("middlename", StringType)
.add("lastname", StringType))
.add("id", StringType)
.add("gender", StringType)
.add("salary", IntegerType)
.add("Hobbies", ArrayType(StringType))
.add("properties", MapType(StringType, StringType))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData), arrayStructureSchema)
+---------------------+---+------+------+-------------------+------------------------------+
|name |id |gender|salary|Hobbies |properties |
+---------------------+---+------+------+-------------------+------------------------------+
|[James , , Smith] |1 |M |3100 |[Cricket, Movies] |[hair -> black, eye -> brown] |
|[Michael , Rose, ] |2 |M |3100 |[Tennis] |[hair -> brown, eye -> black] |
|[Robert , , Williams]|3 |M |3100 |[Cooking, Football]|[hair -> red, eye -> gray] |
|[Maria , Anne, Jones]|4 |M |3100 |null |[hair -> blond, eye -> red] |
|[Jen, Mary, Brown] |5 |M |3100 |[Blogging] |[white -> black, eye -> black]|
+---------------------+---+------+------+-------------------+------------------------------+
创建新列
import org.apache.spark.sql.functions.lit
df.withColumn("Country", lit("USA")).show(false)
//chaining to operate on multiple columns
df.withColumn("Country", lit("USA"))
.withColumn("anotherColumn",lit("anotherValue")).show(false)
+---------------------+---+------+------+-------------------+------------------------------+-------+
|name |id |gender|salary|Hobbies |properties |Country|
+---------------------+---+------+------+-------------------+------------------------------+-------+
|[James , , Smith] |1 |M |3100 |[Cricket, Movies] |[hair -> black, eye -> brown] |USA |
|[Michael , Rose, ] |2 |M |3100 |[Tennis] |[hair -> brown, eye -> black] |USA |
|[Robert , , Williams]|3 |M |3100 |[Cooking, Football]|[hair -> red, eye -> gray] |USA |
|[Maria , Anne, Jones]|4 |M |3100 |null |[hair -> blond, eye -> red] |USA |
|[Jen, Mary, Brown] |5 |M |3100 |[Blogging] |[white -> black, eye -> black]|USA |
+---------------------+---+------+------+-------------------+------------------------------+-------+
+---------------------+---+------+------+-------------------+------------------------------+-------+-------------+
|name |id |gender|salary|Hobbies |properties |Country|anotherColumn|
+---------------------+---+------+------+-------------------+------------------------------+-------+-------------+
|[James , , Smith] |1 |M |3100 |[Cricket, Movies] |[hair -> black, eye -> brown] |USA |anotherValue |
|[Michael , Rose, ] |2 |M |3100 |[Tennis] |[hair -> brown, eye -> black] |USA |anotherValue |
|[Robert , , Williams]|3 |M |3100 |[Cooking, Football]|[hair -> red, eye -> gray] |USA |anotherValue |
|[Maria , Anne, Jones]|4 |M |3100 |null |[hair -> blond, eye -> red] |USA |anotherValue |
|[Jen, Mary, Brown] |5 |M |3100 |[Blogging] |[white -> black, eye -> black]|USA |anotherValue |
+---------------------+---+------+------+-------------------+------------------------------+-------+-------------+
tip
lit()
函数用于将常量转换为 DataFrame 列的值。
caution
withColumn()
只适合操作较少的列,否则会有执行效率问题。
修改列的值
import org.apache.spark.sql.functions.col
df.withColumn("salary", col("salary") * 100).show(false)
+---------------------+---+------+------+-------------------+------------------------------+
|name |id |gender|salary|Hobbies |properties |
+---------------------+---+------+------+-------------------+------------------------------+
|[James , , Smith] |1 |M |310000|[Cricket, Movies] |[hair -> black, eye -> brown] |
|[Michael , Rose, ] |2 |M |310000|[Tennis] |[hair -> brown, eye -> black] |
|[Robert , , Williams]|3 |M |310000|[Cooking, Football]|[hair -> red, eye -> gray] |
|[Maria , Anne, Jones]|4 |M |310000|null |[hair -> blond, eye -> red] |
|[Jen, Mary, Brown] |5 |M |310000|[Blogging] |[white -> black, eye -> black]|
+---------------------+---+------+------+-------------------+------------------------------+
根据现有列增加新列
import org.apache.spark.sql.functions.col
df.withColumn("CopiedColumn", col("salary") * -1).show(false)
+---------------------+---+------+------+-------------------+------------------------------+------------+
|name |id |gender|salary|Hobbies |properties |CopiedColumn|
+---------------------+---+------+------+-------------------+------------------------------+------------+
|[James , , Smith] |1 |M |3100 |[Cricket, Movies] |[hair -> black, eye -> brown] |-3100 |
|[Michael , Rose, ] |2 |M |3100 |[Tennis] |[hair -> brown, eye -> black] |-3100 |
|[Robert , , Williams]|3 |M |3100 |[Cooking, Football]|[hair -> red, eye -> gray] |-3100 |
|[Maria , Anne, Jones]|4 |M |3100 |null |[hair -> blond, eye -> red] |-3100 |
|[Jen, Mary, Brown] |5 |M |3100 |[Blogging] |[white -> black, eye -> black]|-3100 |
+---------------------+---+------+------+-------------------+------------------------------+------------+
修改列类型
import org.apache.spark.sql.functions.col
var df1 = df.withColumn("salary", col("salary").cast("Integer"))
df1.printSchema()
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
|-- Hobbies: array (nullable = true)
| |-- element: string (containsNull = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
修改多列
在 Spark DataFrame 中,涉及修改多列的操作一般不使用 withColumn()
方法,推荐创建一个临时视图,然后使用 Select()
方法对 DataFrame 进行修改。
df.createOrReplaceTempView("PERSON")
spark.sql(
"""SELECT salary*100 as salary,
| salary*-1 as CopiedColumn,
| 'USA' as country FROM PERSON""".stripMargin).show()
+------+------------+-------+
|salary|CopiedColumn|country|
+------+------------+-------+
|310000| -3100| USA|
|310000| -3100| USA|
|310000| -3100| USA|
|310000| -3100| USA|
|310000| -3100| USA|
+------+------------+-------+
列重命名
df.withColumnRenamed("gender", "sex").show(false)
+---------------------+---+---+------+-------------------+------------------------------+
|name |id |sex|salary|Hobbies |properties |
+---------------------+---+---+------+-------------------+------------------------------+
|[James , , Smith] |1 |M |3100 |[Cricket, Movies] |[hair -> black, eye -> brown] |
|[Michael , Rose, ] |2 |M |3100 |[Tennis] |[hair -> brown, eye -> black] |
|[Robert , , Williams]|3 |M |3100 |[Cooking, Football]|[hair -> red, eye -> gray] |
|[Maria , Anne, Jones]|4 |M |3100 |null |[hair -> blond, eye -> red] |
|[Jen, Mary, Brown] |5 |M |3100 |[Blogging] |[white -> black, eye -> black]|
+---------------------+---+---+------+-------------------+------------------------------+
删除列
df.drop("CopiedColumn")
拆分列为多列
val columns = Seq("name", "address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
("Maria, Garcia", "3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns: _*)
dfFromData.printSchema()
val newDF = dfFromData.map(f => {
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0), nameSplit(1), addSplit(0), addSplit(1), addSplit(2), addSplit(3))
})
val finalDF = newDF.toDF("First Name", "Last Name",
"Address Line1", "City", "State", "zipCode")
finalDF.printSchema()
finalDF.show(false)
root
|-- name: string (nullable = true)
|-- address: string (nullable = true)
root
|-- First Name: string (nullable = true)
|-- Last Name: string (nullable = true)
|-- Address Line1: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- zipCode: string (nullable = true)
+----------+---------+--------------+-------+-----+-------+
|First Name|Last Name|Address Line1 |City |State|zipCode|
+----------+---------+--------------+-------+-----+-------+
|Robert | Smith |1 Main st | Newark| NJ | 92537 |
|Maria | Garcia |3456 Walnut st| Newark| NJ | 94732 |
+----------+---------+--------------+-------+-----+-------+
列重命名
数据准备
val data = Seq(Row(Row("James ", "", "Smith"), "36636", "M", 3000),
Row(Row("Michael ", "Rose", ""), "40288", "M", 4000),
Row(Row("Robert ", "", "Williams"), "42114", "M", 4000),
Row(Row("Maria ", "Anne", "Jones"), "39192", "F", 4000),
Row(Row("Jen", "Mary", "Brown"), "", "F", -1)
)
val schema = new StructType()
.add("name", new StructType()
.add("firstname", StringType)
.add("middlename", StringType)
.add("lastname", StringType))
.add("dob", StringType)
.add("gender", StringType)
.add("salary", IntegerType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.printSchema()
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- dob: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
基本使用
df.withColumnRenamed("dob", "DateOfBirth")
.printSchema()
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- DateOfBirth: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
重命名多列
val df2 = df.withColumnRenamed("dob", "DateOfBirth")
.withColumnRenamed("salary", "salary_amount")
df2.printSchema()
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- DateOfBirth: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary_amount: integer (nullable = true)
使用 Spark StructType 重命名
val schema2 = new StructType()
.add("fname", StringType)
.add("middlename", StringType)
.add("lname", StringType)
df.select(col("name").cast(schema2),
col("dob"),
col("gender"),
col("salary"))
.printSchema()
root
|-- name: struct (nullable = true)
| |-- fname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lname: string (nullable = true)
|-- dob: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
使用 Select 重命名
df.select(col("name.firstname").as("fname"),
col("name.middlename").as("mname"),
col("name.lastname").as("lname"),
col("dob"), col("gender"), col("salary"))
.printSchema()
root
|-- fname: string (nullable = true)
|-- mname: string (nullable = true)
|-- lname: string (nullable = true)
|-- dob: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
使用 withColumn 重命名
val df4 = df.withColumn("fname", col("name.firstname"))
.withColumn("mname", col("name.middlename"))
.withColumn("lname", col("name.lastname"))
.drop("name")
df4.printSchema()
root
|-- dob: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
|-- fname: string (nullable = true)
|-- mname: string (nullable = true)
|-- lname: string (nullable = true)
使用 col() 批量修改列名
val old_columns = Seq("dob","gender","salary","fname","mname","lname")
val new_columns = Seq("DateOfBirth","Sex","salary","firstName","middleName","lastName")
val columnsList = old_columns.zip(new_columns).map(f=>{col(f._1).as(f._2)})
val df5 = df4.select(columnsList:_*)
df5.printSchema()
root
|-- DateOfBirth: string (nullable = true)
|-- Sex: string (nullable = true)
|-- salary: integer (nullable = true)
|-- firstName: string (nullable = true)
|-- middleName: string (nullable = true)
|-- lastName: string (nullable = true)
删除列
数据准备
val structureData = Seq(
Row("James", "", "Smith", "36636", "NewYork", 3100),
Row("Michael", "Rose", "", "40288", "California", 4300),
Row("Robert", "", "Williams", "42114", "Florida", 1400),
Row("Maria", "Anne", "Jones", "39192", "Florida", 5500),
Row("Jen", "Mary", "Brown", "34561", "NewYork", 3000)
)
val structureSchema = new StructType()
.add("firstname", StringType)
.add("middlename", StringType)
.add("lastname", StringType)
.add("id", StringType)
.add("location", StringType)
.add("salary", IntegerType)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(structureData), structureSchema)
df.printSchema()
root
|-- firstname: string (nullable = true)
|-- middlename: string (nullable = true)
|-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- salary: integer (nullable = true)
删除一列
val df2 = df.drop("firstname") //First signature
df2.printSchema()
df.drop(df("firstname")).printSchema()
//import org.apache.spark.sql.functions.col is required
df.drop(col("firstname")).printSchema() //Third signature
root
|-- middlename: string (nullable = true)
|-- lastname: string (nullable = true)
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- salary: integer (nullable = true)
删除多列
//Refering more than one column
df.drop("firstname", "middlename", "lastname")
.printSchema()
// using array/sequence of columns
val cols = Seq("firstname", "middlename", "lastname")
df.drop(cols: _*)
.printSchema()
root
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- salary: integer (nullable = true)