SparkSQL StructType例子

刘超 11小时前 ⋅ 58 阅读   编辑

目录
  1、Structs简单例子
  2、Spark StructType和StructField与DataFrame一起使用

 

一、Structs简单例子

scala> :paste
// Entering paste mode (ctrl-D to finish)

val structureData = Seq(
    Row(Row("James ","","Smith"),"36636","M",3100),
    Row(Row("Michael ","Rose",""),"40288","M",4300),
    Row(Row("Robert ","","Williams"),"42114","M",1400),
    Row(Row("Maria ","Anne","Jones"),"39192","F",5500),
    Row(Row("Jen","Mary","Brown"),"","F",-1)
)

val structureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("id",StringType)
    .add("gender",StringType)
    .add("salary",IntegerType)

val df2 = spark.createDataFrame(spark.sparkContext.parallelize(structureData),structureSchema)
df2.printSchema()
df2.show()

// Exiting paste mode, now interpreting.

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+-------------------+-----+------+------+
|               name|   id|gender|salary|
+-------------------+-----+------+------+
|    [James ,,Smith]|36636|     M|  3100|
|   [Michael ,Rose,]|40288|     M|  4300|
|[Robert ,,Williams]|42114|     M|  1400|
|[Maria ,Anne,Jones]|39192|     F|  5500|
|   [Jen,Mary,Brown]|     |     F|    -1|
+-------------------+-----+------+------+

scala> df2.select($"name").show()
+-------------------+
|               name|
+-------------------+
|    [James ,,Smith]|
|   [Michael ,Rose,]|
|[Robert ,,Williams]|
|[Maria ,Anne,Jones]|
|   [Jen,Mary,Brown]|
+-------------------+
# 访问struct中的某个变量
scala> df2.select($"name.firstname").show()
+---------+
|firstname|
+---------+
|   James |
| Michael |
|  Robert |
|   Maria |
|      Jen|
+---------+
# 不能$"name['firstname']"这样访问
scala> df2.select($"name['firstname']").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`name['firstname']`' given input columns: [name, id, gender, salary];;
'Project ['name['firstname']]
+- LogicalRDD [name#31, id#32, gender#33, salary#34]

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)

 

二、 Spark StructType和StructField与DataFrame一起使用

  在创建Spark DataFrame时,我们可以使用StructType和StructField类指定结构。如引言中所述,StructType是StructField的集合,用于定义列名称,数据类型,是否为可为空的标志。使用StructField,我们还可以添加嵌套的结构模式,为数组添加ArrayType,为键-值对添加MapType

StructType和StructField案例类如下。

case class StructType(fields: Array[StructField])

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty)
下面的示例演示了一个非常简单的示例,说明如何使用DataFrame上的StructType和StructField创建结构,以及如何使用示例数据来支持该结构。

scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.expressions._

scala> val simpleData = Seq(
     |     Row("James","","Smith","36636","M",3000),
     |     Row("Michael","Rose","","40288","M",4000),
     |     Row("Robert","","Williams","42114","M",4000),
     |     Row("Maria","Anne","Jones","39192","F",4000),
     |     Row("Jen","Mary","Brown","","F",-1)
     | )

scala> val simpleSchema = StructType(
     |     Array(
     |         StructField("firstname",StringType,true),
     |         StructField("middlename",StringType,true),
     |         StructField("lastname",StringType,true),
     |         StructField("id", StringType, true),
     |         StructField("gender", StringType, true),
     |         StructField("salary", IntegerType, true)
     |     )
     | )

scala> val df = spark.createDataFrame(spark.sparkContext.parallelize(simpleData),simpleSchema)

scala> df.printSchema()
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 
scala> df.show()
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+


定义嵌套的StructType对象结构

在使用DataFrame时,我们经常需要使用嵌套的struct列,并且可以使用SQL StructType进行定义。

在下面的示例中,我实例化了StructType并使用add方法(而不是StructField)添加列名和数据类型。请注意,对于“名称”列,数据类型是嵌套的StructType。

 

scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.expressions._

scala> :paste
// Entering paste mode (ctrl-D to finish)

val structureData = Seq(
    Row(Row("James ","","Smith"),"36636","M",3100),
    Row(Row("Michael ","Rose",""),"40288","M",4300),
    Row(Row("Robert ","","Williams"),"42114","M",1400),
    Row(Row("Maria ","Anne","Jones"),"39192","F",5500),
    Row(Row("Jen","Mary","Brown"),"","F",-1)
)

val structureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("id",StringType)
    .add("gender",StringType)
    .add("salary",IntegerType)

val df2 = spark.createDataFrame(spark.sparkContext.parallelize(structureData),structureSchema)
df2.printSchema()
df2.show()

// Exiting paste mode, now interpreting.

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+-------------------+-----+------+------+
|               name|   id|gender|salary|
+-------------------+-----+------+------+
|    [James ,,Smith]|36636|     M|  3100|
|   [Michael ,Rose,]|40288|     M|  4300|
|[Robert ,,Williams]|42114|     M|  1400|
|[Maria ,Anne,Jones]|39192|     F|  5500|
|   [Jen,Mary,Brown]|     |     F|    -1|
+-------------------+-----+------+------+


从JSON文件创建StructType对象结构

如果列太多,并且DataFrame的结构不时更改,那么从JSON文件加载SQL StructType模式是一个好习惯。请注意,JSON中的定义使用不同的布局,您可以通过使用schema.prettyJson() 

 

// schema.json内容如下
{
  "type" : "struct",
  "fields" : [ {
    "name" : "name",
    "type" : {
      "type" : "struct",
      "fields" : [ {
        "name" : "firstname",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "middlename",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      }, {
        "name" : "lastname",
        "type" : "string",
        "nullable" : true,
        "metadata" : { }
      } ]
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "dob",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "gender",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "salary",
    "type" : "integer",
    "nullable" : true,
    "metadata" : { }
  } ]
}
// exampel code
val url = ClassLoader.getSystemResource("schema.json")
val schemaSource = Source.fromFile(url.getFile).getLines.mkString
val schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType]
val df3 = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),schemaFromJson)
  df3.printSchema()

打印的输出与上一节相同。您还可以在逗号分隔的文件中具有可为空的名称,类型和标志,我们可以使用它们以编程方式创建StructType,我将留给您研究。

DataFrame的添加和更改结构

使用Spark SQL函数 struct(),我们可以更改现有DataFrame的结构,并为其添加新的StructType。下面的示例演示如何将列从一种结构复制到另一种结构并添加新列。

scala> :paste
// Entering paste mode (ctrl-D to finish)

val df4 = df2
val updatedDF = df4.withColumn("OtherInfo",struct(
    col("id").as("identifier"),
    col("gender").as("gender"),
    col("salary").as("salary"),
    when(col("salary").cast(IntegerType) < 2000,"Low")
    .when(col("salary").cast(IntegerType) < 4000,"Medium")
    .otherwise("High").alias("Salary_Grade")
    )
).drop("id","gender","salary")

updatedDF.printSchema()
updatedDF.show(false)

// Exiting paste mode, now interpreting.

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

+-------------------+---------------------+
|name               |OtherInfo            |
+-------------------+---------------------+
|[James ,,Smith]    |[36636,M,3100,Medium]|
|[Michael ,Rose,]   |[40288,M,4300,High]  |
|[Robert ,,Williams]|[42114,M,1400,Low]   |
|[Maria ,Anne,Jones]|[39192,F,5500,High]  |
|[Jen,Mary,Brown]   |[,F,-1,Low]          |
+-------------------+---------------------+


在这里,它将“ gender”,“ salary”和“ id” 复制到新的结构“ otherInfo”,并添加新列“ Salary_Grade”。

使用SQL ArrayType和MapType

SQL StructType还支持ArrayType和MapType分别为数组和地图集合定义DataFrame列。在下面的示例中,“爱好”列定义为ArrayType(StringType),“属性”定义为MapType(StringType,StringType),意味着键和值均作为String。

scala> :paste
// Entering paste mode (ctrl-D to finish)

val arrayStructureData = Seq(
    Row(Row("James ","","Smith"),List("Cricket","Movies"),Map("hair"->"black","eye"->"brown")),
    Row(Row("Michael ","Rose",""),List("Tennis"),Map("hair"->"brown","eye"->"black")),
    Row(Row("Robert ","","Williams"),List("Cooking","Football"),Map("hair"->"red","eye"->"gray")),
    Row(Row("Maria ","Anne","Jones"),null,Map("hair"->"blond","eye"->"red")),
    Row(Row("Jen","Mary","Brown"),List("Blogging"),Map("white"->"black","eye"->"black"))
)

val arrayStructureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("hobbies", ArrayType(StringType))
    .add("properties", MapType(StringType,StringType))

val df5 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
df5.printSchema()
df5.show()

// Exiting paste mode, now interpreting.

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+-------------------+-------------------+--------------------+
|               name|            hobbies|          properties|
+-------------------+-------------------+--------------------+
|    [James ,,Smith]|  [Cricket, Movies]|Map(hair -> black...|
|   [Michael ,Rose,]|           [Tennis]|Map(hair -> brown...|
|[Robert ,,Williams]|[Cooking, Football]|Map(hair -> red, ...|
|[Maria ,Anne,Jones]|               null|Map(hair -> blond...|
|   [Jen,Mary,Brown]|         [Blogging]|Map(white -> blac...|
+-------------------+-------------------+--------------------+

输出以下架构和DataFrame数据。请注意,字段“ Hobbies”是数组类型,“ properties”是映射类型。


将案例类转换为Spark StructType

Spark SQL还提供了编码器,可将案例类转换为StructType对象。如果您使用的是旧版Spark,还可以使用Scala hack将案例类转换为架构。这两个例子都在这里。

scala> case class Name(first:String,last:String,middle:String)
defined class Name

scala> case class Employee(fullName:Name,age:Integer,gender:String)
defined class Employee

scala> import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.ScalaReflection

scala> val schema = ScalaReflection.schemaFor[Employee].dataType.asInstanceOf[StructType]

scala> val encoderSchema = Encoders.product[Employee].schema

scala> encoderSchema.printTreeString()
root
 |-- fullName: struct (nullable = true)
 |    |-- first: string (nullable = true)
 |    |-- last: string (nullable = true)
 |    |-- middle: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)

从DDL字符串创建StructType对象结构

像从JSON字符串加载结构一样,我们也可以从DLL中创建它(通过fromDDL()在SQL StructType类上使用静态函数StructType.fromDDL)。您还可以使用从架构生成DDL toDDL()。struct对象上的printTreeString()打印类似于printSchema函数返回的模式。

val ddlSchemaStr = "`fullName` STRUCT<`first`: STRING,`last`: STRING, `middle`: STRING>,`age` INT,`gender` STRING"
val ddlSchema = StructType.fromDDL(ddlSchemaStr)
ddlSchema.printTreeString()

  可能报如下错误:

1、error: value fromDDL is not a member of object org.apache.spark.sql.types.StructType

检查DataFrame中是否存在字段

如果要对DataFrame的元数据执行某些检查,例如,如果DataFrame或列的数据类型中存在列或字段,则为否;我们可以使用SQL StructType和StructField上的几个函数轻松地做到这一点

scala> println(df.schema.fieldNames.contains("firstname"))
true

scala> println(df.schema.contains(StructField("firstname",StringType,true)))
true


此示例在两种情况下均返回“ true”。对于第二个,如果您使用IntegetType而不是StringType,则它返回false,因为名字列的数据类型是String,因为它检查ins字段中的每个属性。同样,您也可以检查两个模式是否相等或更多


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: