spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-6777] [SQL] Implements backwards compatibility rules in CatalystSchemaConverter
Date Wed, 24 Jun 2015 22:03:43 GMT
Repository: spark
Updated Branches:
  refs/heads/master fb32c3889 -> 8ab50765c


[SPARK-6777] [SQL] Implements backwards compatibility rules in CatalystSchemaConverter

This PR introduces `CatalystSchemaConverter` for converting Parquet schema to Spark SQL schema and vice versa.  Original conversion code in `ParquetTypesConverter` is removed. Benefits of the new version are:

1. When converting Spark SQL schemas, it generates standard Parquet schemas conforming to [the most updated Parquet format spec] [1]. Converting to old style Parquet schemas is also supported via feature flag `spark.sql.parquet.followParquetFormatSpec` (which is set to `false` for now, and should be set to `true` after both read and write paths are fixed).

   Note that although this version of Parquet format spec hasn't been officially release yet, Parquet MR 1.7.0 already sticks to it. So it should be safe to follow.

1. It implements backwards-compatibility rules described in the most updated Parquet format spec. Thus can recognize more schema patterns generated by other/legacy systems/tools.
1. Code organization follows convention used in [parquet-mr] [2], which is easier to follow. (Structure of `CatalystSchemaConverter` is similar to `AvroSchemaConverter`).

To fully implement backwards-compatibility rules in both read and write path, we also need to update `CatalystRowConverter` (which is responsible for converting Parquet records to `Row`s), `RowReadSupport`, and `RowWriteSupport`. These would be done in follow-up PRs.

TODO

- [x] More schema conversion test cases for legacy schema patterns.

[1]: https://github.com/apache/parquet-format/blob/ea095226597fdbecd60c2419d96b54b2fdb4ae6c/LogicalTypes.md
[2]: https://github.com/apache/parquet-mr/

Author: Cheng Lian <lian@databricks.com>

Closes #6617 from liancheng/spark-6777 and squashes the following commits:

2a2062d [Cheng Lian] Don't convert decimals without precision information
b60979b [Cheng Lian] Adds a constructor which accepts a Configuration, and fixes default value of assumeBinaryIsString
743730f [Cheng Lian] Decimal scale shouldn't be larger than precision
a104a9e [Cheng Lian] Fixes Scala style issue
1f71d8d [Cheng Lian] Adds feature flag to allow falling back to old style Parquet schema conversion
ba84f4b [Cheng Lian] Fixes MapType schema conversion bug
13cb8d5 [Cheng Lian] Fixes MiMa failure
81de5b0 [Cheng Lian] Fixes UDT, workaround read path, and add tests
28ef95b [Cheng Lian] More AnalysisExceptions
b10c322 [Cheng Lian] Replaces require() with analysisRequire() which throws AnalysisException
cceaf3f [Cheng Lian] Implements backwards compatibility rules in CatalystSchemaConverter


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ab50765
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ab50765
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ab50765

Branch: refs/heads/master
Commit: 8ab50765cd793169091d983b50d87a391f6ac1f4
Parents: fb32c38
Author: Cheng Lian <lian@databricks.com>
Authored: Wed Jun 24 15:03:43 2015 -0700
Committer: Cheng Lian <lian@databricks.com>
Committed: Wed Jun 24 15:03:43 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   7 +-
 .../apache/spark/sql/types/DecimalType.scala    |   9 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  14 +
 .../sql/parquet/CatalystSchemaConverter.scala   | 565 ++++++++++++++
 .../spark/sql/parquet/ParquetTableSupport.scala |   6 +-
 .../apache/spark/sql/parquet/ParquetTypes.scala | 374 +---------
 .../spark/sql/parquet/ParquetIOSuite.scala      |   6 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  | 736 +++++++++++++++++--
 .../sql/hive/execution/SQLQuerySuite.scala      |   2 +-
 9 files changed, 1297 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f678c69..6f86a50 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -69,7 +69,12 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.sql.parquet.CatalystTimestampConverter"),
             ProblemFilters.exclude[MissingClassProblem](
-              "org.apache.spark.sql.parquet.CatalystTimestampConverter$")
+              "org.apache.spark.sql.parquet.CatalystTimestampConverter$"),
+            // SPARK-6777 Implements backwards compatibility rules in CatalystSchemaConverter
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.ParquetTypeInfo"),
+            ProblemFilters.exclude[MissingClassProblem](
+              "org.apache.spark.sql.parquet.ParquetTypeInfo$")
           )
         case v if v.startsWith("1.4") =>
           Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
index 407dc27..18cdfa7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
@@ -20,13 +20,18 @@ package org.apache.spark.sql.types
 import scala.reflect.runtime.universe.typeTag
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ScalaReflectionLock
 import org.apache.spark.sql.catalyst.expressions.Expression
 
 
 /** Precision parameters for a Decimal */
-case class PrecisionInfo(precision: Int, scale: Int)
-
+case class PrecisionInfo(precision: Int, scale: Int) {
+  if (scale > precision) {
+    throw new AnalysisException(
+      s"Decimal scale ($scale) cannot be greater than precision ($precision).")
+  }
+}
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2653526..9a10a23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -264,6 +264,14 @@ private[spark] object SQLConf {
     defaultValue = Some(true),
     doc = "<TODO>")
 
+  val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
+    key = "spark.sql.parquet.followParquetFormatSpec",
+    defaultValue = Some(false),
+    doc = "Wether to stick to Parquet format specification when converting Parquet schema to " +
+      "Spark SQL schema and vice versa.  Sticks to the specification if set to true; falls back " +
+      "to compatible mode if set to false.",
+    isPublic = false)
+
   val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
     key = "spark.sql.parquet.output.committer.class",
     defaultValue = Some(classOf[ParquetOutputCommitter].getName),
@@ -499,6 +507,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
   private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
 
   /**
+   * When set to true, sticks to Parquet format spec when converting Parquet schema to Spark SQL
+   * schema and vice versa.  Otherwise, falls back to compatible mode.
+   */
+  private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
+
+  /**
    * When set to true, partition pruning for in-memory columnar tables is enabled.
    */
   private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
new file mode 100644
index 0000000..4fd3e93
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.OriginalType._
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
+import org.apache.parquet.schema.Type.Repetition._
+import org.apache.parquet.schema._
+
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{AnalysisException, SQLConf}
+
+/**
+ * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] and
+ * vice versa.
+ *
+ * Parquet format backwards-compatibility rules are respected when converting Parquet
+ * [[MessageType]] schemas.
+ *
+ * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ *
+ * @constructor
+ * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
+ *        [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
+ *        [[StructType]].
+ * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
+ *        [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
+ *        [[StructType]].  Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
+ *        has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
+ *        described in Parquet format spec.
+ * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when
+ *        converting Spark SQL [[StructType]] to Parquet [[MessageType]].  For Spark 1.4.x and
+ *        prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and
+ *        uses non-standard LIST and MAP structure.  Note that the current Parquet format spec is
+ *        backwards-compatible with these settings.  If this argument is set to `false`, we fallback
+ *        to old style non-standard behaviors.
+ */
+private[parquet] class CatalystSchemaConverter(
+    private val assumeBinaryIsString: Boolean,
+    private val assumeInt96IsTimestamp: Boolean,
+    private val followParquetFormatSpec: Boolean) {
+
+  // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
+  // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.
+  def this() = this(
+    assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+    assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+    followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
+
+  def this(conf: SQLConf) = this(
+    assumeBinaryIsString = conf.isParquetBinaryAsString,
+    assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
+    followParquetFormatSpec = conf.followParquetFormatSpec)
+
+  def this(conf: Configuration) = this(
+    assumeBinaryIsString =
+      conf.getBoolean(
+        SQLConf.PARQUET_BINARY_AS_STRING.key,
+        SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
+    assumeInt96IsTimestamp =
+      conf.getBoolean(
+        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+        SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
+    followParquetFormatSpec =
+      conf.getBoolean(
+        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
+        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
+
+  /**
+   * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
+   */
+  def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())
+
+  private def convert(parquetSchema: GroupType): StructType = {
+    val fields = parquetSchema.getFields.map { field =>
+      field.getRepetition match {
+        case OPTIONAL =>
+          StructField(field.getName, convertField(field), nullable = true)
+
+        case REQUIRED =>
+          StructField(field.getName, convertField(field), nullable = false)
+
+        case REPEATED =>
+          throw new AnalysisException(
+            s"REPEATED not supported outside LIST or MAP. Type: $field")
+      }
+    }
+
+    StructType(fields)
+  }
+
+  /**
+   * Converts a Parquet [[Type]] to a Spark SQL [[DataType]].
+   */
+  def convertField(parquetType: Type): DataType = parquetType match {
+    case t: PrimitiveType => convertPrimitiveField(t)
+    case t: GroupType => convertGroupField(t.asGroupType())
+  }
+
+  private def convertPrimitiveField(field: PrimitiveType): DataType = {
+    val typeName = field.getPrimitiveTypeName
+    val originalType = field.getOriginalType
+
+    def typeString =
+      if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
+
+    def typeNotImplemented() =
+      throw new AnalysisException(s"Parquet type not yet supported: $typeString")
+
+    def illegalType() =
+      throw new AnalysisException(s"Illegal Parquet type: $typeString")
+
+    // When maxPrecision = -1, we skip precision range check, and always respect the precision
+    // specified in field.getDecimalMetadata.  This is useful when interpreting decimal types stored
+    // as binaries with variable lengths.
+    def makeDecimalType(maxPrecision: Int = -1): DecimalType = {
+      val precision = field.getDecimalMetadata.getPrecision
+      val scale = field.getDecimalMetadata.getScale
+
+      CatalystSchemaConverter.analysisRequire(
+        maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
+        s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
+
+      DecimalType(precision, scale)
+    }
+
+    field.getPrimitiveTypeName match {
+      case BOOLEAN => BooleanType
+
+      case FLOAT => FloatType
+
+      case DOUBLE => DoubleType
+
+      case INT32 =>
+        field.getOriginalType match {
+          case INT_8 => ByteType
+          case INT_16 => ShortType
+          case INT_32 | null => IntegerType
+          case DATE => DateType
+          case DECIMAL => makeDecimalType(maxPrecisionForBytes(4))
+          case TIME_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT64 =>
+        field.getOriginalType match {
+          case INT_64 | null => LongType
+          case DECIMAL => makeDecimalType(maxPrecisionForBytes(8))
+          case TIMESTAMP_MILLIS => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case INT96 =>
+        CatalystSchemaConverter.analysisRequire(
+          assumeInt96IsTimestamp,
+          "INT96 is not supported unless it's interpreted as timestamp. " +
+            s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
+        TimestampType
+
+      case BINARY =>
+        field.getOriginalType match {
+          case UTF8 => StringType
+          case null if assumeBinaryIsString => StringType
+          case null => BinaryType
+          case DECIMAL => makeDecimalType()
+          case _ => illegalType()
+        }
+
+      case FIXED_LEN_BYTE_ARRAY =>
+        field.getOriginalType match {
+          case DECIMAL => makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
+          case INTERVAL => typeNotImplemented()
+          case _ => illegalType()
+        }
+
+      case _ => illegalType()
+    }
+  }
+
+  private def convertGroupField(field: GroupType): DataType = {
+    Option(field.getOriginalType).fold(convert(field): DataType) {
+      // A Parquet list is represented as a 3-level structure:
+      //
+      //   <list-repetition> group <name> (LIST) {
+      //     repeated group list {
+      //       <element-repetition> <element-type> element;
+      //     }
+      //   }
+      //
+      // However, according to the most recent Parquet format spec (not released yet up until
+      // writing), some 2-level structures are also recognized for backwards-compatibility.  Thus,
+      // we need to check whether the 2nd level or the 3rd level refers to list element type.
+      //
+      // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+      case LIST =>
+        CatalystSchemaConverter.analysisRequire(
+          field.getFieldCount == 1, s"Invalid list type $field")
+
+        val repeatedType = field.getType(0)
+        CatalystSchemaConverter.analysisRequire(
+          repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
+
+        if (isElementType(repeatedType, field.getName)) {
+          ArrayType(convertField(repeatedType), containsNull = false)
+        } else {
+          val elementType = repeatedType.asGroupType().getType(0)
+          val optional = elementType.isRepetition(OPTIONAL)
+          ArrayType(convertField(elementType), containsNull = optional)
+        }
+
+      // scalastyle:off
+      // `MAP_KEY_VALUE` is for backwards-compatibility
+      // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
+      // scalastyle:on
+      case MAP | MAP_KEY_VALUE =>
+        CatalystSchemaConverter.analysisRequire(
+          field.getFieldCount == 1 && !field.getType(0).isPrimitive,
+          s"Invalid map type: $field")
+
+        val keyValueType = field.getType(0).asGroupType()
+        CatalystSchemaConverter.analysisRequire(
+          keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2,
+          s"Invalid map type: $field")
+
+        val keyType = keyValueType.getType(0)
+        CatalystSchemaConverter.analysisRequire(
+          keyType.isPrimitive,
+          s"Map key type is expected to be a primitive type, but found: $keyType")
+
+        val valueType = keyValueType.getType(1)
+        val valueOptional = valueType.isRepetition(OPTIONAL)
+        MapType(
+          convertField(keyType),
+          convertField(valueType),
+          valueContainsNull = valueOptional)
+
+      case _ =>
+        throw new AnalysisException(s"Unrecognized Parquet type: $field")
+    }
+  }
+
+  // scalastyle:off
+  // Here we implement Parquet LIST backwards-compatibility rules.
+  // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
+  // scalastyle:on
+  private def isElementType(repeatedType: Type, parentName: String) = {
+    {
+      // For legacy 2-level list types with primitive element type, e.g.:
+      //
+      //    // List<Integer> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated int32 element;
+      //    }
+      //
+      repeatedType.isPrimitive
+    } || {
+      // For legacy 2-level list types whose element type is a group type with 2 or more fields,
+      // e.g.:
+      //
+      //    // List<Tuple<String, Integer>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group element {
+      //        required binary str (UTF8);
+      //        required int32 num;
+      //      };
+      //    }
+      //
+      repeatedType.asGroupType().getFieldCount > 1
+    } || {
+      // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group array {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == "array"
+    } || {
+      // For Parquet data generated by parquet-thrift, e.g.:
+      //
+      //    // List<OneTuple<String>> (nullable list, non-null elements)
+      //    optional group my_list (LIST) {
+      //      repeated group my_list_tuple {
+      //        required binary str (UTF8);
+      //      };
+      //    }
+      //
+      repeatedType.getName == s"${parentName}_tuple"
+    }
+  }
+
+  /**
+   * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
+   */
+  def convert(catalystSchema: StructType): MessageType = {
+    Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root")
+  }
+
+  /**
+   * Converts a Spark SQL [[StructField]] to a Parquet [[Type]].
+   */
+  def convertField(field: StructField): Type = {
+    convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
+  }
+
+  private def convertField(field: StructField, repetition: Type.Repetition): Type = {
+    CatalystSchemaConverter.checkFieldName(field.name)
+
+    field.dataType match {
+      // ===================
+      // Simple atomic types
+      // ===================
+
+      case BooleanType =>
+        Types.primitive(BOOLEAN, repetition).named(field.name)
+
+      case ByteType =>
+        Types.primitive(INT32, repetition).as(INT_8).named(field.name)
+
+      case ShortType =>
+        Types.primitive(INT32, repetition).as(INT_16).named(field.name)
+
+      case IntegerType =>
+        Types.primitive(INT32, repetition).named(field.name)
+
+      case LongType =>
+        Types.primitive(INT64, repetition).named(field.name)
+
+      case FloatType =>
+        Types.primitive(FLOAT, repetition).named(field.name)
+
+      case DoubleType =>
+        Types.primitive(DOUBLE, repetition).named(field.name)
+
+      case StringType =>
+        Types.primitive(BINARY, repetition).as(UTF8).named(field.name)
+
+      case DateType =>
+        Types.primitive(INT32, repetition).as(DATE).named(field.name)
+
+      // NOTE: !! This timestamp type is not specified in Parquet format spec !!
+      // However, Impala and older versions of Spark SQL use INT96 to store timestamps with
+      // nanosecond precision (not TIME_MILLIS or TIMESTAMP_MILLIS described in the spec).
+      case TimestampType =>
+        Types.primitive(INT96, repetition).named(field.name)
+
+      case BinaryType =>
+        Types.primitive(BINARY, repetition).named(field.name)
+
+      // =====================================
+      // Decimals (for Spark version <= 1.4.x)
+      // =====================================
+
+      // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
+      // always store decimals in fixed-length byte arrays.
+      case DecimalType.Fixed(precision, scale)
+        if precision <= maxPrecisionForBytes(8) && !followParquetFormatSpec =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(minBytesForPrecision(precision))
+          .named(field.name)
+
+      case dec @ DecimalType() if !followParquetFormatSpec =>
+        throw new AnalysisException(
+          s"Data type $dec is not supported. " +
+            s"When ${SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key} is set to false," +
+            "decimal precision and scale must be specified, " +
+            "and precision must be less than or equal to 18.")
+
+      // =====================================
+      // Decimals (follow Parquet format spec)
+      // =====================================
+
+      // Uses INT32 for 1 <= precision <= 9
+      case DecimalType.Fixed(precision, scale)
+        if precision <= maxPrecisionForBytes(4) && followParquetFormatSpec =>
+        Types
+          .primitive(INT32, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses INT64 for 1 <= precision <= 18
+      case DecimalType.Fixed(precision, scale)
+        if precision <= maxPrecisionForBytes(8) && followParquetFormatSpec =>
+        Types
+          .primitive(INT64, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .named(field.name)
+
+      // Uses FIXED_LEN_BYTE_ARRAY for all other precisions
+      case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
+        Types
+          .primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .as(DECIMAL)
+          .precision(precision)
+          .scale(scale)
+          .length(minBytesForPrecision(precision))
+          .named(field.name)
+
+      case dec @ DecimalType.Unlimited if followParquetFormatSpec =>
+        throw new AnalysisException(
+          s"Data type $dec is not supported. Decimal precision and scale must be specified.")
+
+      // ===================================================
+      // ArrayType and MapType (for Spark versions <= 1.4.x)
+      // ===================================================
+
+      // Spark 1.4.x and prior versions convert ArrayType with nullable elements into a 3-level
+      // LIST structure.  This behavior mimics parquet-hive (1.6.0rc3).  Note that this case is
+      // covered by the backwards-compatibility rules implemented in `isElementType()`.
+      case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec =>
+        // <list-repetition> group <name> (LIST) {
+        //   optional group bag {
+        //     repeated <element-type> element;
+        //   }
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          Types
+            .buildGroup(REPEATED)
+            .addField(convertField(StructField("element", elementType, nullable)))
+            .named(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME))
+
+      // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
+      // LIST structure.  This behavior mimics parquet-avro (1.6.0rc3).  Note that this case is
+      // covered by the backwards-compatibility rules implemented in `isElementType()`.
+      case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated <element-type> element;
+        // }
+        ConversionPatterns.listType(
+          repetition,
+          field.name,
+          convertField(StructField("element", elementType, nullable), REPEATED))
+
+      // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
+      // MAP_KEY_VALUE.  This is covered by `convertGroupField(field: GroupType): DataType`.
+      case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group map (MAP_KEY_VALUE) {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        ConversionPatterns.mapType(
+          repetition,
+          field.name,
+          convertField(StructField("key", keyType, nullable = false)),
+          convertField(StructField("value", valueType, valueContainsNull)))
+
+      // ==================================================
+      // ArrayType and MapType (follow Parquet format spec)
+      // ==================================================
+
+      case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
+        // <list-repetition> group <name> (LIST) {
+        //   repeated group list {
+        //     <element-repetition> <element-type> element;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(LIST)
+          .addField(
+            Types.repeatedGroup()
+              .addField(convertField(StructField("element", elementType, containsNull)))
+              .named("list"))
+          .named(field.name)
+
+      case MapType(keyType, valueType, valueContainsNull) =>
+        // <map-repetition> group <name> (MAP) {
+        //   repeated group key_value {
+        //     required <key-type> key;
+        //     <value-repetition> <value-type> value;
+        //   }
+        // }
+        Types
+          .buildGroup(repetition).as(MAP)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(convertField(StructField("key", keyType, nullable = false)))
+              .addField(convertField(StructField("value", valueType, valueContainsNull)))
+              .named("key_value"))
+          .named(field.name)
+
+      // ===========
+      // Other types
+      // ===========
+
+      case StructType(fields) =>
+        fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
+          builder.addField(convertField(field))
+        }.named(field.name)
+
+      case udt: UserDefinedType[_] =>
+        convertField(field.copy(dataType = udt.sqlType))
+
+      case _ =>
+        throw new AnalysisException(s"Unsupported data type $field.dataType")
+    }
+  }
+
+  // Max precision of a decimal value stored in `numBytes` bytes
+  private def maxPrecisionForBytes(numBytes: Int): Int = {
+    Math.round(                               // convert double to long
+      Math.floor(Math.log10(                  // number of base-10 digits
+        Math.pow(2, 8 * numBytes - 1) - 1)))  // max value stored in numBytes
+      .asInstanceOf[Int]
+  }
+
+  // Min byte counts needed to store decimals with various precisions
+  private val minBytesForPrecision: Array[Int] = Array.tabulate(38) { precision =>
+    var numBytes = 1
+    while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
+      numBytes += 1
+    }
+    numBytes
+  }
+}
+
+
+private[parquet] object CatalystSchemaConverter {
+  def checkFieldName(name: String): Unit = {
+    // ,;{}()\n\t= and space are special characters in Parquet schema
+    analysisRequire(
+      !name.matches(".*[ ,;{}()\n\t=].*"),
+      s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
+         |Please use alias to rename it.
+       """.stripMargin.split("\n").mkString(" "))
+  }
+
+  def analysisRequire(f: => Boolean, message: String): Unit = {
+    if (!f) {
+      throw new AnalysisException(message)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index e65fa00..0d96a1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -86,8 +86,7 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
     // TODO: Why it can be null?
     if (schema == null)  {
       log.debug("falling back to Parquet read schema")
-      schema = ParquetTypesConverter.convertToAttributes(
-        parquetSchema, false, true)
+      schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false, true)
     }
     log.debug(s"list of attributes that will be read: $schema")
     new RowRecordMaterializer(parquetSchema, schema)
@@ -105,8 +104,7 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
       // If the parquet file is thrift derived, there is a good chance that
       // it will have the thrift class in metadata.
       val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
-      parquetSchema = ParquetTypesConverter
-        .convertFromAttributes(requestedAttributes, isThriftDerived)
+      parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes)
       metadata.put(
         RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
         ParquetTypesConverter.convertToString(requestedAttributes))

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index ba2a35b..4d5199a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -29,214 +29,19 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter
 import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
 import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
-import org.apache.parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
-import org.apache.parquet.schema.Type.Repetition
-import org.apache.parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
+import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.types._
 
 
-/** A class representing Parquet info fields we care about, for passing back to Parquet */
-private[parquet] case class ParquetTypeInfo(
-  primitiveType: ParquetPrimitiveTypeName,
-  originalType: Option[ParquetOriginalType] = None,
-  decimalMetadata: Option[DecimalMetadata] = None,
-  length: Option[Int] = None)
-
 private[parquet] object ParquetTypesConverter extends Logging {
   def isPrimitiveType(ctype: DataType): Boolean = ctype match {
     case _: NumericType | BooleanType | StringType | BinaryType => true
     case _: DataType => false
   }
 
-  def toPrimitiveDataType(
-      parquetType: ParquetPrimitiveType,
-      binaryAsString: Boolean,
-      int96AsTimestamp: Boolean): DataType = {
-    val originalType = parquetType.getOriginalType
-    val decimalInfo = parquetType.getDecimalMetadata
-    parquetType.getPrimitiveTypeName match {
-      case ParquetPrimitiveTypeName.BINARY
-        if (originalType == ParquetOriginalType.UTF8 || binaryAsString) => StringType
-      case ParquetPrimitiveTypeName.BINARY => BinaryType
-      case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
-      case ParquetPrimitiveTypeName.DOUBLE => DoubleType
-      case ParquetPrimitiveTypeName.FLOAT => FloatType
-      case ParquetPrimitiveTypeName.INT32
-        if originalType == ParquetOriginalType.DATE => DateType
-      case ParquetPrimitiveTypeName.INT32 => IntegerType
-      case ParquetPrimitiveTypeName.INT64 => LongType
-      case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
-      case ParquetPrimitiveTypeName.INT96 =>
-        // TODO: add BigInteger type? TODO(andre) use DecimalType instead????
-        throw new AnalysisException("Potential loss of precision: cannot convert INT96")
-      case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-        if (originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18) =>
-          // TODO: for now, our reader only supports decimals that fit in a Long
-          DecimalType(decimalInfo.getPrecision, decimalInfo.getScale)
-      case _ => throw new AnalysisException(s"Unsupported parquet datatype $parquetType")
-    }
-  }
-
-  /**
-   * Converts a given Parquet `Type` into the corresponding
-   * [[org.apache.spark.sql.types.DataType]].
-   *
-   * We apply the following conversion rules:
-   * <ul>
-   *   <li> Primitive types are converter to the corresponding primitive type.</li>
-   *   <li> Group types that have a single field that is itself a group, which has repetition
-   *        level `REPEATED`, are treated as follows:<ul>
-   *          <li> If the nested group has name `values`, the surrounding group is converted
-   *               into an [[ArrayType]] with the corresponding field type (primitive or
-   *               complex) as element type.</li>
-   *          <li> If the nested group has name `map` and two fields (named `key` and `value`),
-   *               the surrounding group is converted into a [[MapType]]
-   *               with the corresponding key and value (value possibly complex) types.
-   *               Note that we currently assume map values are not nullable.</li>
-   *   <li> Other group types are converted into a [[StructType]] with the corresponding
-   *        field types.</li></ul></li>
-   * </ul>
-   * Note that fields are determined to be `nullable` if and only if their Parquet repetition
-   * level is not `REQUIRED`.
-   *
-   * @param parquetType The type to convert.
-   * @return The corresponding Catalyst type.
-   */
-  def toDataType(parquetType: ParquetType,
-                 isBinaryAsString: Boolean,
-                 isInt96AsTimestamp: Boolean): DataType = {
-    def correspondsToMap(groupType: ParquetGroupType): Boolean = {
-      if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
-        false
-      } else {
-        // This mostly follows the convention in ``parquet.schema.ConversionPatterns``
-        val keyValueGroup = groupType.getFields.apply(0).asGroupType()
-        keyValueGroup.getRepetition == Repetition.REPEATED &&
-          keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME &&
-          keyValueGroup.getFieldCount == 2 &&
-          keyValueGroup.getFields.apply(0).getName == CatalystConverter.MAP_KEY_SCHEMA_NAME &&
-          keyValueGroup.getFields.apply(1).getName == CatalystConverter.MAP_VALUE_SCHEMA_NAME
-      }
-    }
-
-    def correspondsToArray(groupType: ParquetGroupType): Boolean = {
-      groupType.getFieldCount == 1 &&
-        groupType.getFieldName(0) == CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME &&
-        groupType.getFields.apply(0).getRepetition == Repetition.REPEATED
-    }
-
-    if (parquetType.isPrimitive) {
-      toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString, isInt96AsTimestamp)
-    } else {
-      val groupType = parquetType.asGroupType()
-      parquetType.getOriginalType match {
-        // if the schema was constructed programmatically there may be hints how to convert
-        // it inside the metadata via the OriginalType field
-        case ParquetOriginalType.LIST => { // TODO: check enums!
-          assert(groupType.getFieldCount == 1)
-          val field = groupType.getFields.apply(0)
-          if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
-            val bag = field.asGroupType()
-            assert(bag.getFieldCount == 1)
-            ArrayType(
-              toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
-              containsNull = true)
-          } else {
-            ArrayType(
-              toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
-          }
-        }
-        case ParquetOriginalType.MAP => {
-          assert(
-            !groupType.getFields.apply(0).isPrimitive,
-            "Parquet Map type malformatted: expected nested group for map!")
-          val keyValueGroup = groupType.getFields.apply(0).asGroupType()
-          assert(
-            keyValueGroup.getFieldCount == 2,
-            "Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
-          assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-
-          val keyType =
-            toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
-          val valueType =
-            toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
-          MapType(keyType, valueType,
-            keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
-        }
-        case _ => {
-          // Note: the order of these checks is important!
-          if (correspondsToMap(groupType)) { // MapType
-            val keyValueGroup = groupType.getFields.apply(0).asGroupType()
-            assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-
-            val keyType =
-              toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp)
-            val valueType =
-              toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString, isInt96AsTimestamp)
-            MapType(keyType, valueType,
-              keyValueGroup.getFields.apply(1).getRepetition != Repetition.REQUIRED)
-          } else if (correspondsToArray(groupType)) { // ArrayType
-            val field = groupType.getFields.apply(0)
-            if (field.getName == CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME) {
-              val bag = field.asGroupType()
-              assert(bag.getFieldCount == 1)
-              ArrayType(
-                toDataType(bag.getFields.apply(0), isBinaryAsString, isInt96AsTimestamp),
-                containsNull = true)
-            } else {
-              ArrayType(
-                toDataType(field, isBinaryAsString, isInt96AsTimestamp), containsNull = false)
-            }
-          } else { // everything else: StructType
-            val fields = groupType
-              .getFields
-              .map(ptype => new StructField(
-              ptype.getName,
-              toDataType(ptype, isBinaryAsString, isInt96AsTimestamp),
-              ptype.getRepetition != Repetition.REQUIRED))
-            StructType(fields)
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * For a given Catalyst [[org.apache.spark.sql.types.DataType]] return
-   * the name of the corresponding Parquet primitive type or None if the given type
-   * is not primitive.
-   *
-   * @param ctype The type to convert
-   * @return The name of the corresponding Parquet type properties
-   */
-  def fromPrimitiveDataType(ctype: DataType): Option[ParquetTypeInfo] = ctype match {
-    case StringType => Some(ParquetTypeInfo(
-      ParquetPrimitiveTypeName.BINARY, Some(ParquetOriginalType.UTF8)))
-    case BinaryType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.BINARY))
-    case BooleanType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.BOOLEAN))
-    case DoubleType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.DOUBLE))
-    case FloatType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FLOAT))
-    case IntegerType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
-    // There is no type for Byte or Short so we promote them to INT32.
-    case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
-    case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
-    case DateType => Some(ParquetTypeInfo(
-      ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
-    case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
-    case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
-    case DecimalType.Fixed(precision, scale) if precision <= 18 =>
-      // TODO: for now, our writer only supports decimals that fit in a Long
-      Some(ParquetTypeInfo(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
-        Some(ParquetOriginalType.DECIMAL),
-        Some(new DecimalMetadata(precision, scale)),
-        Some(BYTES_FOR_PRECISION(precision))))
-    case _ => None
-  }
-
   /**
    * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision.
    */
@@ -248,177 +53,29 @@ private[parquet] object ParquetTypesConverter extends Logging {
     length
   }
 
-  /**
-   * Converts a given Catalyst [[org.apache.spark.sql.types.DataType]] into
-   * the corresponding Parquet `Type`.
-   *
-   * The conversion follows the rules below:
-   * <ul>
-   *   <li> Primitive types are converted into Parquet's primitive types.</li>
-   *   <li> [[org.apache.spark.sql.types.StructType]]s are converted
-   *        into Parquet's `GroupType` with the corresponding field types.</li>
-   *   <li> [[org.apache.spark.sql.types.ArrayType]]s are converted
-   *        into a 2-level nested group, where the outer group has the inner
-   *        group as sole field. The inner group has name `values` and
-   *        repetition level `REPEATED` and has the element type of
-   *        the array as schema. We use Parquet's `ConversionPatterns` for this
-   *        purpose.</li>
-   *   <li> [[org.apache.spark.sql.types.MapType]]s are converted
-   *        into a nested (2-level) Parquet `GroupType` with two fields: a key
-   *        type and a value type. The nested group has repetition level
-   *        `REPEATED` and name `map`. We use Parquet's `ConversionPatterns`
-   *        for this purpose</li>
-   * </ul>
-   * Parquet's repetition level is generally set according to the following rule:
-   * <ul>
-   *   <li> If the call to `fromDataType` is recursive inside an enclosing `ArrayType` or
-   *   `MapType`, then the repetition level is set to `REPEATED`.</li>
-   *   <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet
-   *   type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
-   * </ul>
-   *
-   *@param ctype The type to convert
-   * @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]]
-   *             whose type is converted
-   * @param nullable When true indicates that the attribute is nullable
-   * @param inArray When true indicates that this is a nested attribute inside an array.
-   * @return The corresponding Parquet type.
-   */
-  def fromDataType(
-      ctype: DataType,
-      name: String,
-      nullable: Boolean = true,
-      inArray: Boolean = false,
-      toThriftSchemaNames: Boolean = false): ParquetType = {
-    val repetition =
-      if (inArray) {
-        Repetition.REPEATED
-      } else {
-        if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
-      }
-    val arraySchemaName = if (toThriftSchemaNames) {
-      name + CatalystConverter.THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX
-    } else {
-      CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
-    }
-    val typeInfo = fromPrimitiveDataType(ctype)
-    typeInfo.map {
-      case ParquetTypeInfo(primitiveType, originalType, decimalMetadata, length) =>
-        val builder = ParquetTypes.primitive(primitiveType, repetition).as(originalType.orNull)
-        for (len <- length) {
-          builder.length(len)
-        }
-        for (metadata <- decimalMetadata) {
-          builder.precision(metadata.getPrecision).scale(metadata.getScale)
-        }
-        builder.named(name)
-    }.getOrElse {
-      ctype match {
-        case udt: UserDefinedType[_] => {
-          fromDataType(udt.sqlType, name, nullable, inArray, toThriftSchemaNames)
-        }
-        case ArrayType(elementType, false) => {
-          val parquetElementType = fromDataType(
-            elementType,
-            arraySchemaName,
-            nullable = false,
-            inArray = true,
-            toThriftSchemaNames)
-          ConversionPatterns.listType(repetition, name, parquetElementType)
-        }
-        case ArrayType(elementType, true) => {
-          val parquetElementType = fromDataType(
-            elementType,
-            arraySchemaName,
-            nullable = true,
-            inArray = false,
-            toThriftSchemaNames)
-          ConversionPatterns.listType(
-            repetition,
-            name,
-            new ParquetGroupType(
-              Repetition.REPEATED,
-              CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME,
-              parquetElementType))
-        }
-        case StructType(structFields) => {
-          val fields = structFields.map {
-            field => fromDataType(field.dataType, field.name, field.nullable,
-                                  inArray = false, toThriftSchemaNames)
-          }
-          new ParquetGroupType(repetition, name, fields.toSeq)
-        }
-        case MapType(keyType, valueType, valueContainsNull) => {
-          val parquetKeyType =
-            fromDataType(
-              keyType,
-              CatalystConverter.MAP_KEY_SCHEMA_NAME,
-              nullable = false,
-              inArray = false,
-              toThriftSchemaNames)
-          val parquetValueType =
-            fromDataType(
-              valueType,
-              CatalystConverter.MAP_VALUE_SCHEMA_NAME,
-              nullable = valueContainsNull,
-              inArray = false,
-              toThriftSchemaNames)
-          ConversionPatterns.mapType(
-            repetition,
-            name,
-            parquetKeyType,
-            parquetValueType)
-        }
-        case _ => throw new AnalysisException(s"Unsupported datatype $ctype")
-      }
-    }
-  }
-
-  def convertToAttributes(parquetSchema: ParquetType,
-                          isBinaryAsString: Boolean,
-                          isInt96AsTimestamp: Boolean): Seq[Attribute] = {
-    parquetSchema
-      .asGroupType()
-      .getFields
-      .map(
-        field =>
-          new AttributeReference(
-            field.getName,
-            toDataType(field, isBinaryAsString, isInt96AsTimestamp),
-            field.getRepetition != Repetition.REQUIRED)())
+  def convertToAttributes(
+      parquetSchema: MessageType,
+      isBinaryAsString: Boolean,
+      isInt96AsTimestamp: Boolean): Seq[Attribute] = {
+    val converter = new CatalystSchemaConverter(
+      isBinaryAsString, isInt96AsTimestamp, followParquetFormatSpec = false)
+    converter.convert(parquetSchema).toAttributes
   }
 
-  def convertFromAttributes(attributes: Seq[Attribute],
-                            toThriftSchemaNames: Boolean = false): MessageType = {
-    checkSpecialCharacters(attributes)
-    val fields = attributes.map(
-      attribute =>
-        fromDataType(attribute.dataType, attribute.name, attribute.nullable,
-                     toThriftSchemaNames = toThriftSchemaNames))
-    new MessageType("root", fields)
+  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+    val converter = new CatalystSchemaConverter()
+    converter.convert(StructType.fromAttributes(attributes))
   }
 
   def convertFromString(string: String): Seq[Attribute] = {
     Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
       case s: StructType => s.toAttributes
-      case other => throw new AnalysisException(s"Can convert $string to row")
-    }
-  }
-
-  private def checkSpecialCharacters(schema: Seq[Attribute]) = {
-    // ,;{}()\n\t= and space character are special characters in Parquet schema
-    schema.map(_.name).foreach { name =>
-      if (name.matches(".*[ ,;{}()\n\t=].*")) {
-        throw new AnalysisException(
-          s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
-             |Please use alias to rename it.
-           """.stripMargin.split("\n").mkString(" "))
-      }
+      case other => sys.error(s"Can convert $string to row")
     }
   }
 
   def convertToString(schema: Seq[Attribute]): String = {
-    checkSpecialCharacters(schema)
+    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
     StructType.fromAttributes(schema).json
   }
 
@@ -450,8 +107,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
       ParquetTypesConverter.convertToString(attributes))
     // TODO: add extra data, e.g., table name, date, etc.?
 
-    val parquetSchema: MessageType =
-      ParquetTypesConverter.convertFromAttributes(attributes)
+    val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes)
     val metaData: FileMetaData = new FileMetaData(
       parquetSchema,
       extraMetadata,

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 47a7be1..7b16eba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -99,7 +99,6 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
   }
 
   test("fixed-length decimals") {
-
     def makeDecimalRDD(decimal: DecimalType): DataFrame =
       sqlContext.sparkContext
         .parallelize(0 to 1000)
@@ -158,6 +157,11 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     checkParquetFile(data)
   }
 
+  test("array and double") {
+    val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble)))
+    checkParquetFile(data)
+  }
+
   test("struct") {
     val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
     withParquetDataFrame(data) { df =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 171a656..d0bfcde 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -24,26 +24,109 @@ import org.apache.parquet.schema.MessageTypeParser
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types._
 
-class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
-  lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
+abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest {
+  val sqlContext = TestSQLContext
 
   /**
    * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`.
    */
-  private def testSchema[T <: Product: ClassTag: TypeTag](
-      testName: String, messageType: String, isThriftDerived: Boolean = false): Unit = {
-    test(testName) {
-      val actual = ParquetTypesConverter.convertFromAttributes(
-        ScalaReflection.attributesFor[T], isThriftDerived)
-      val expected = MessageTypeParser.parseMessageType(messageType)
+  protected def testSchemaInference[T <: Product: ClassTag: TypeTag](
+      testName: String,
+      messageType: String,
+      binaryAsString: Boolean = true,
+      int96AsTimestamp: Boolean = true,
+      followParquetFormatSpec: Boolean = false,
+      isThriftDerived: Boolean = false): Unit = {
+    testSchema(
+      testName,
+      StructType.fromAttributes(ScalaReflection.attributesFor[T]),
+      messageType,
+      binaryAsString,
+      int96AsTimestamp,
+      followParquetFormatSpec,
+      isThriftDerived)
+  }
+
+  protected def testParquetToCatalyst(
+      testName: String,
+      sqlSchema: StructType,
+      parquetSchema: String,
+      binaryAsString: Boolean = true,
+      int96AsTimestamp: Boolean = true,
+      followParquetFormatSpec: Boolean = false,
+      isThriftDerived: Boolean = false): Unit = {
+    val converter = new CatalystSchemaConverter(
+      assumeBinaryIsString = binaryAsString,
+      assumeInt96IsTimestamp = int96AsTimestamp,
+      followParquetFormatSpec = followParquetFormatSpec)
+
+    test(s"sql <= parquet: $testName") {
+      val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
+      val expected = sqlSchema
+      assert(
+        actual === expected,
+        s"""Schema mismatch.
+           |Expected schema: ${expected.json}
+           |Actual schema:   ${actual.json}
+         """.stripMargin)
+    }
+  }
+
+  protected def testCatalystToParquet(
+      testName: String,
+      sqlSchema: StructType,
+      parquetSchema: String,
+      binaryAsString: Boolean = true,
+      int96AsTimestamp: Boolean = true,
+      followParquetFormatSpec: Boolean = false,
+      isThriftDerived: Boolean = false): Unit = {
+    val converter = new CatalystSchemaConverter(
+      assumeBinaryIsString = binaryAsString,
+      assumeInt96IsTimestamp = int96AsTimestamp,
+      followParquetFormatSpec = followParquetFormatSpec)
+
+    test(s"sql => parquet: $testName") {
+      val actual = converter.convert(sqlSchema)
+      val expected = MessageTypeParser.parseMessageType(parquetSchema)
       actual.checkContains(expected)
       expected.checkContains(actual)
     }
   }
 
-  testSchema[(Boolean, Int, Long, Float, Double, Array[Byte])](
+  protected def testSchema(
+      testName: String,
+      sqlSchema: StructType,
+      parquetSchema: String,
+      binaryAsString: Boolean = true,
+      int96AsTimestamp: Boolean = true,
+      followParquetFormatSpec: Boolean = false,
+      isThriftDerived: Boolean = false): Unit = {
+
+    testCatalystToParquet(
+      testName,
+      sqlSchema,
+      parquetSchema,
+      binaryAsString,
+      int96AsTimestamp,
+      followParquetFormatSpec,
+      isThriftDerived)
+
+    testParquetToCatalyst(
+      testName,
+      sqlSchema,
+      parquetSchema,
+      binaryAsString,
+      int96AsTimestamp,
+      followParquetFormatSpec,
+      isThriftDerived)
+  }
+}
+
+class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+  testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
     "basic types",
     """
       |message root {
@@ -54,9 +137,10 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
       |  required double  _5;
       |  optional binary  _6;
       |}
-    """.stripMargin)
+    """.stripMargin,
+    binaryAsString = false)
 
-  testSchema[(Byte, Short, Int, Long, java.sql.Date)](
+  testSchemaInference[(Byte, Short, Int, Long, java.sql.Date)](
     "logical integral types",
     """
       |message root {
@@ -68,27 +152,79 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
       |}
     """.stripMargin)
 
-  // Currently String is the only supported logical binary type.
-  testSchema[Tuple1[String]](
-    "binary logical types",
+  testSchemaInference[Tuple1[String]](
+    "string",
     """
       |message root {
       |  optional binary _1 (UTF8);
       |}
+    """.stripMargin,
+    binaryAsString = true)
+
+  testSchemaInference[Tuple1[Seq[Int]]](
+    "non-nullable array - non-standard",
+    """
+      |message root {
+      |  optional group _1 (LIST) {
+      |    repeated int32 element;
+      |  }
+      |}
     """.stripMargin)
 
-  testSchema[Tuple1[Seq[Int]]](
-    "array",
+  testSchemaInference[Tuple1[Seq[Int]]](
+    "non-nullable array - standard",
+    """
+      |message root {
+      |  optional group _1 (LIST) {
+      |    repeated group list {
+      |      required int32 element;
+      |    }
+      |  }
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchemaInference[Tuple1[Seq[Integer]]](
+    "nullable array - non-standard",
     """
       |message root {
       |  optional group _1 (LIST) {
-      |    repeated int32 array;
+      |    repeated group bag {
+      |      optional int32 element;
+      |    }
       |  }
       |}
     """.stripMargin)
 
-  testSchema[Tuple1[Map[Int, String]]](
-    "map",
+  testSchemaInference[Tuple1[Seq[Integer]]](
+    "nullable array - standard",
+    """
+      |message root {
+      |  optional group _1 (LIST) {
+      |    repeated group list {
+      |      optional int32 element;
+      |    }
+      |  }
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchemaInference[Tuple1[Map[Int, String]]](
+    "map - standard",
+    """
+      |message root {
+      |  optional group _1 (MAP) {
+      |    repeated group key_value {
+      |      required int32 key;
+      |      optional binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchemaInference[Tuple1[Map[Int, String]]](
+    "map - non-standard",
     """
       |message root {
       |  optional group _1 (MAP) {
@@ -100,7 +236,7 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
       |}
     """.stripMargin)
 
-  testSchema[Tuple1[Pair[Int, String]]](
+  testSchemaInference[Tuple1[Pair[Int, String]]](
     "struct",
     """
       |message root {
@@ -109,20 +245,21 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
       |    optional binary _2 (UTF8);
       |  }
       |}
-    """.stripMargin)
+    """.stripMargin,
+    followParquetFormatSpec = true)
 
-  testSchema[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
-    "deeply nested type",
+  testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
+    "deeply nested type - non-standard",
     """
       |message root {
-      |  optional group _1 (MAP) {
-      |    repeated group map (MAP_KEY_VALUE) {
+      |  optional group _1 (MAP_KEY_VALUE) {
+      |    repeated group map {
       |      required int32 key;
       |      optional group value {
       |        optional binary _1 (UTF8);
       |        optional group _2 (LIST) {
       |          repeated group bag {
-      |            optional group array {
+      |            optional group element {
       |              required int32 _1;
       |              required double _2;
       |            }
@@ -134,43 +271,76 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
       |}
     """.stripMargin)
 
-  testSchema[(Option[Int], Map[Int, Option[Double]])](
-    "optional types",
+  testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]](
+    "deeply nested type - standard",
     """
       |message root {
-      |  optional int32 _1;
-      |  optional group _2 (MAP) {
-      |    repeated group map (MAP_KEY_VALUE) {
+      |  optional group _1 (MAP) {
+      |    repeated group key_value {
       |      required int32 key;
-      |      optional double value;
+      |      optional group value {
+      |        optional binary _1 (UTF8);
+      |        optional group _2 (LIST) {
+      |          repeated group list {
+      |            optional group element {
+      |              required int32 _1;
+      |              required double _2;
+      |            }
+      |          }
+      |        }
+      |      }
       |    }
       |  }
       |}
-    """.stripMargin)
+    """.stripMargin,
+    followParquetFormatSpec = true)
 
-  // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
-  // as expected from attributes
-  testSchema[(Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
-    "thrift generated parquet schema",
+  testSchemaInference[(Option[Int], Map[Int, Option[Double]])](
+    "optional types",
     """
       |message root {
-      |  optional binary _1 (UTF8);
-      |  optional binary _2 (UTF8);
-      |  optional binary _3 (UTF8);
-      |  optional group _4 (LIST) {
-      |    repeated int32 _4_tuple;
-      |  }
-      |  optional group _5 (MAP) {
-      |    repeated group map (MAP_KEY_VALUE) {
-      |      required binary key (UTF8);
-      |      optional group value (LIST) {
-      |        repeated int32 value_tuple;
-      |      }
+      |  optional int32 _1;
+      |  optional group _2 (MAP) {
+      |    repeated group key_value {
+      |      required int32 key;
+      |      optional double value;
       |    }
       |  }
       |}
-    """.stripMargin, isThriftDerived = true)
+    """.stripMargin,
+    followParquetFormatSpec = true)
 
+  // Parquet files generated by parquet-thrift are already handled by the schema converter, but
+  // let's leave this test here until both read path and write path are all updated.
+  ignore("thrift generated parquet schema") {
+    // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated
+    // as expected from attributes
+    testSchemaInference[(
+      Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])](
+      "thrift generated parquet schema",
+      """
+        |message root {
+        |  optional binary _1 (UTF8);
+        |  optional binary _2 (UTF8);
+        |  optional binary _3 (UTF8);
+        |  optional group _4 (LIST) {
+        |    repeated int32 _4_tuple;
+        |  }
+        |  optional group _5 (MAP) {
+        |    repeated group map (MAP_KEY_VALUE) {
+        |      required binary key (UTF8);
+        |      optional group value (LIST) {
+        |        repeated int32 value_tuple;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+      isThriftDerived = true)
+  }
+}
+
+class ParquetSchemaSuite extends ParquetSchemaTest {
   test("DataType string parser compatibility") {
     // This is the generated string from previous versions of the Spark SQL, using the following:
     // val schema = StructType(List(
@@ -180,10 +350,7 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
       "StructType(List(StructField(c1,IntegerType,false), StructField(c2,BinaryType,true)))"
 
     // scalastyle:off
-    val jsonString =
-      """
-        |{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}
-      """.stripMargin
+    val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}"""
     // scalastyle:on
 
     val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString)
@@ -277,4 +444,465 @@ class ParquetSchemaSuite extends SparkFunSuite with ParquetTest {
           StructField("secondField", StringType, nullable = true))))
     }.getMessage.contains("detected conflicting schemas"))
   }
+
+  // =======================================================
+  // Tests for converting Parquet LIST to Catalyst ArrayType
+  // =======================================================
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with nullable element type - 1 - standard",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(IntegerType, containsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group list {
+      |      optional int32 element;
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with nullable element type - 2",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(IntegerType, containsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group element {
+      |      optional int32 num;
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
+    StructType(Seq(
+      StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group list {
+      |      required int32 element;
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type - 2",
+    StructType(Seq(
+      StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group element {
+      |      required int32 num;
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type - 3",
+    StructType(Seq(
+      StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated int32 element;
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type - 4",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(
+          StructType(Seq(
+            StructField("str", StringType, nullable = false),
+            StructField("num", IntegerType, nullable = false))),
+          containsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group element {
+      |      required binary str (UTF8);
+      |      required int32 num;
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(
+          StructType(Seq(
+            StructField("str", StringType, nullable = false))),
+          containsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group array {
+      |      required binary str (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(
+          StructType(Seq(
+            StructField("str", StringType, nullable = false))),
+          containsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group f1_tuple {
+      |      required binary str (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  // =======================================================
+  // Tests for converting Catalyst ArrayType to Parquet LIST
+  // =======================================================
+
+  testCatalystToParquet(
+    "Backwards-compatibility: LIST with nullable element type - 1 - standard",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(IntegerType, containsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group list {
+      |      optional int32 element;
+      |    }
+      |  }
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testCatalystToParquet(
+    "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(IntegerType, containsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group bag {
+      |      optional int32 element;
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testCatalystToParquet(
+    "Backwards-compatibility: LIST with non-nullable element type - 1 - standard",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(IntegerType, containsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated group list {
+      |      required int32 element;
+      |    }
+      |  }
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testCatalystToParquet(
+    "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x",
+    StructType(Seq(
+      StructField(
+        "f1",
+        ArrayType(IntegerType, containsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (LIST) {
+      |    repeated int32 element;
+      |  }
+      |}
+    """.stripMargin)
+
+  // ====================================================
+  // Tests for converting Parquet Map to Catalyst MapType
+  // ====================================================
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: MAP with non-nullable value type - 1 - standard",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group key_value {
+      |      required int32 key;
+      |      required binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: MAP with non-nullable value type - 2",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP_KEY_VALUE) {
+      |    repeated group map {
+      |      required int32 num;
+      |      required binary str (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required int32 key;
+      |      required binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: MAP with nullable value type - 1 - standard",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group key_value {
+      |      required int32 key;
+      |      optional binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: MAP with nullable value type - 2",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP_KEY_VALUE) {
+      |    repeated group map {
+      |      required int32 num;
+      |      optional binary str (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required int32 key;
+      |      optional binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  // ====================================================
+  // Tests for converting Catalyst MapType to Parquet Map
+  // ====================================================
+
+  testCatalystToParquet(
+    "Backwards-compatibility: MAP with non-nullable value type - 1 - standard",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group key_value {
+      |      required int32 key;
+      |      required binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testCatalystToParquet(
+    "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = false),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required int32 key;
+      |      required binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  testCatalystToParquet(
+    "Backwards-compatibility: MAP with nullable value type - 1 - standard",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group key_value {
+      |      required int32 key;
+      |      optional binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testCatalystToParquet(
+    "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x",
+    StructType(Seq(
+      StructField(
+        "f1",
+        MapType(IntegerType, StringType, valueContainsNull = true),
+        nullable = true))),
+    """message root {
+      |  optional group f1 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required int32 key;
+      |      optional binary value (UTF8);
+      |    }
+      |  }
+      |}
+    """.stripMargin)
+
+  // =================================
+  // Tests for conversion for decimals
+  // =================================
+
+  testSchema(
+    "DECIMAL(1, 0) - standard",
+    StructType(Seq(StructField("f1", DecimalType(1, 0)))),
+    """message root {
+      |  optional int32 f1 (DECIMAL(1, 0));
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchema(
+    "DECIMAL(8, 3) - standard",
+    StructType(Seq(StructField("f1", DecimalType(8, 3)))),
+    """message root {
+      |  optional int32 f1 (DECIMAL(8, 3));
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchema(
+    "DECIMAL(9, 3) - standard",
+    StructType(Seq(StructField("f1", DecimalType(9, 3)))),
+    """message root {
+      |  optional int32 f1 (DECIMAL(9, 3));
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchema(
+    "DECIMAL(18, 3) - standard",
+    StructType(Seq(StructField("f1", DecimalType(18, 3)))),
+    """message root {
+      |  optional int64 f1 (DECIMAL(18, 3));
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchema(
+    "DECIMAL(19, 3) - standard",
+    StructType(Seq(StructField("f1", DecimalType(19, 3)))),
+    """message root {
+      |  optional fixed_len_byte_array(9) f1 (DECIMAL(19, 3));
+      |}
+    """.stripMargin,
+    followParquetFormatSpec = true)
+
+  testSchema(
+    "DECIMAL(1, 0) - prior to 1.4.x",
+    StructType(Seq(StructField("f1", DecimalType(1, 0)))),
+    """message root {
+      |  optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0));
+      |}
+    """.stripMargin)
+
+  testSchema(
+    "DECIMAL(8, 3) - prior to 1.4.x",
+    StructType(Seq(StructField("f1", DecimalType(8, 3)))),
+    """message root {
+      |  optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3));
+      |}
+    """.stripMargin)
+
+  testSchema(
+    "DECIMAL(9, 3) - prior to 1.4.x",
+    StructType(Seq(StructField("f1", DecimalType(9, 3)))),
+    """message root {
+      |  optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3));
+      |}
+    """.stripMargin)
+
+  testSchema(
+    "DECIMAL(18, 3) - prior to 1.4.x",
+    StructType(Seq(StructField("f1", DecimalType(18, 3)))),
+    """message root {
+      |  optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3));
+      |}
+    """.stripMargin)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a2e6665..f0aad8d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -638,7 +638,7 @@ class SQLQuerySuite extends QueryTest {
   test("SPARK-5203 union with different decimal precision") {
     Seq.empty[(Decimal, Decimal)]
       .toDF("d1", "d2")
-      .select($"d1".cast(DecimalType(10, 15)).as("d"))
+      .select($"d1".cast(DecimalType(10, 5)).as("d"))
       .registerTempTable("dn")
 
     sql("select d from dn union all select d * 2 from dn")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message