spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject git commit: [SPARK-2927][SQL] Add a conf to configure if we always read Binary columns stored in Parquet as String columns
Date Thu, 14 Aug 2014 17:46:37 GMT
Repository: spark
Updated Branches:
  refs/heads/master 078f3fbda -> add75d483


[SPARK-2927][SQL] Add a conf to configure if we always read Binary columns stored in Parquet
as String columns

This PR adds a new conf flag `spark.sql.parquet.binaryAsString`. When it is `true`, if there
is no parquet metadata file available to provide the schema of the data, we will always treat
binary fields stored in parquet as string fields. This conf is used to provide a way to read
string fields generated without UTF8 decoration.

JIRA: https://issues.apache.org/jira/browse/SPARK-2927

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #1855 from yhuai/parquetBinaryAsString and squashes the following commits:

689ffa9 [Yin Huai] Add missing "=".
80827de [Yin Huai] Unit test.
1765ca4 [Yin Huai] Use .toBoolean.
9d3f199 [Yin Huai] Merge remote-tracking branch 'upstream/master' into parquetBinaryAsString
5d436a1 [Yin Huai] The initial support of adding a conf to treat binary columns stored in
Parquet as string columns.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/add75d48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/add75d48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/add75d48

Branch: refs/heads/master
Commit: add75d4831fdc35712bf8b737574ea0bc677c37c
Parents: 078f3fb
Author: Yin Huai <huai@cse.ohio-state.edu>
Authored: Thu Aug 14 10:46:33 2014 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Thu Aug 14 10:46:33 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    | 10 +++-
 .../spark/sql/parquet/ParquetRelation.scala     |  6 ++-
 .../spark/sql/parquet/ParquetTableSupport.scala |  3 +-
 .../apache/spark/sql/parquet/ParquetTypes.scala | 36 +++++++------
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 54 ++++++++++++++++++--
 5 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/add75d48/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 35c51de..90de111 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -31,6 +31,7 @@ private[spark] object SQLConf {
   val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
   val CODEGEN_ENABLED = "spark.sql.codegen"
   val DIALECT = "spark.sql.dialect"
+  val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -87,8 +88,7 @@ trait SQLConf {
    *
    * Defaults to false as this feature is currently experimental.
    */
-  private[spark] def codegenEnabled: Boolean =
-    if (getConf(CODEGEN_ENABLED, "false") == "true") true else false
+  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean
 
   /**
    * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion
to
@@ -108,6 +108,12 @@ trait SQLConf {
   private[spark] def defaultSizeInBytes: Long =
     getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong
 
+  /**
+   * When set to true, we always treat byte arrays in Parquet files as strings.
+   */
+  private[spark] def isParquetBinaryAsString: Boolean =
+    getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/add75d48/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index b3bae5d..053b2a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -60,7 +60,11 @@ private[sql] case class ParquetRelation(
       .getSchema
 
   /** Attributes */
-  override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
+  override val output =
+    ParquetTypesConverter.readSchemaFromFile(
+      new Path(path),
+      conf,
+      sqlContext.isParquetBinaryAsString)
 
   override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/add75d48/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 6d4ce32..6a657c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -80,9 +80,10 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging
{
       }
     }
     // if both unavailable, fall back to deducing the schema from the given Parquet schema
+    // TODO: Why it can be null?
     if (schema == null)  {
       log.debug("falling back to Parquet read schema")
-      schema = ParquetTypesConverter.convertToAttributes(parquetSchema)
+      schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
     }
     log.debug(s"list of attributes that will be read: $schema")
     new RowRecordMaterializer(parquetSchema, schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/add75d48/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 37091bc..b0579f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -43,10 +43,13 @@ private[parquet] object ParquetTypesConverter extends Logging {
   def isPrimitiveType(ctype: DataType): Boolean =
     classOf[PrimitiveType] isAssignableFrom ctype.getClass
 
-  def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
+  def toPrimitiveDataType(
+      parquetType: ParquetPrimitiveType,
+      binayAsString: Boolean): DataType =
     parquetType.getPrimitiveTypeName match {
       case ParquetPrimitiveTypeName.BINARY
-        if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
+        if (parquetType.getOriginalType == ParquetOriginalType.UTF8 ||
+          binayAsString) => StringType
       case ParquetPrimitiveTypeName.BINARY => BinaryType
       case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
       case ParquetPrimitiveTypeName.DOUBLE => DoubleType
@@ -85,7 +88,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
    * @param parquetType The type to convert.
    * @return The corresponding Catalyst type.
    */
-  def toDataType(parquetType: ParquetType): DataType = {
+  def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = {
     def correspondsToMap(groupType: ParquetGroupType): Boolean = {
       if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
         false
@@ -107,7 +110,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
 
     if (parquetType.isPrimitive) {
-      toPrimitiveDataType(parquetType.asPrimitiveType)
+      toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString)
     } else {
       val groupType = parquetType.asGroupType()
       parquetType.getOriginalType match {
@@ -116,7 +119,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         case ParquetOriginalType.LIST => { // TODO: check enums!
           assert(groupType.getFieldCount == 1)
           val field = groupType.getFields.apply(0)
-          ArrayType(toDataType(field), containsNull = false)
+          ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
         }
         case ParquetOriginalType.MAP => {
           assert(
@@ -126,9 +129,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
           assert(
             keyValueGroup.getFieldCount == 2,
             "Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
-          val keyType = toDataType(keyValueGroup.getFields.apply(0))
+          val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
           assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-          val valueType = toDataType(keyValueGroup.getFields.apply(1))
+          val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
           assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
           // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull
is true
           // at here.
@@ -138,22 +141,22 @@ private[parquet] object ParquetTypesConverter extends Logging {
           // Note: the order of these checks is important!
           if (correspondsToMap(groupType)) { // MapType
             val keyValueGroup = groupType.getFields.apply(0).asGroupType()
-            val keyType = toDataType(keyValueGroup.getFields.apply(0))
+            val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
             assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-            val valueType = toDataType(keyValueGroup.getFields.apply(1))
+            val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
             assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
             // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull
is true
             // at here.
             MapType(keyType, valueType)
           } else if (correspondsToArray(groupType)) { // ArrayType
-            val elementType = toDataType(groupType.getFields.apply(0))
+            val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString)
             ArrayType(elementType, containsNull = false)
           } else { // everything else: StructType
             val fields = groupType
               .getFields
               .map(ptype => new StructField(
               ptype.getName,
-              toDataType(ptype),
+              toDataType(ptype, isBinaryAsString),
               ptype.getRepetition != Repetition.REQUIRED))
             StructType(fields)
           }
@@ -276,7 +279,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
   }
 
-  def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
+  def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute]
= {
     parquetSchema
       .asGroupType()
       .getFields
@@ -284,7 +287,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         field =>
           new AttributeReference(
             field.getName,
-            toDataType(field),
+            toDataType(field, isBinaryAsString),
             field.getRepetition != Repetition.REQUIRED)())
   }
 
@@ -404,7 +407,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
    * @param conf The Hadoop configuration to use.
    * @return A list of attributes that make up the schema.
    */
-  def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
+  def readSchemaFromFile(
+      origPath: Path,
+      conf: Option[Configuration],
+      isBinaryAsString: Boolean): Seq[Attribute] = {
     val keyValueMetadata: java.util.Map[String, String] =
       readMetaData(origPath, conf)
         .getFileMetaData
@@ -413,7 +419,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
       convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
     } else {
       val attributes = convertToAttributes(
-        readMetaData(origPath, conf).getFileMetaData.getSchema)
+        readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
       log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
       attributes
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/add75d48/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 502f670..172dcd6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -21,8 +21,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
 
 import parquet.hadoop.ParquetFileWriter
 import parquet.hadoop.util.ContextUtil
-import parquet.schema.MessageTypeParser
-
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce.Job
 
@@ -33,7 +31,6 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
 import org.apache.spark.sql.catalyst.util.getTempFilePath
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.util.Utils
@@ -138,6 +135,57 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     }
   }
 
+  test("Treat binary as string") {
+    val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString
+
+    // Create the test file.
+    val file = getTempFilePath("parquet")
+    val path = file.toString
+    val range = (0 to 255)
+    val rowRDD = TestSQLContext.sparkContext.parallelize(range)
+      .map(i => org.apache.spark.sql.Row(i, s"val_$i".getBytes))
+    // We need to ask Parquet to store the String column as a Binary column.
+    val schema = StructType(
+      StructField("c1", IntegerType, false) ::
+      StructField("c2", BinaryType, false) :: Nil)
+    val schemaRDD1 = applySchema(rowRDD, schema)
+    schemaRDD1.saveAsParquetFile(path)
+    val resultWithBinary = parquetFile(path).collect
+    range.foreach {
+      i =>
+        assert(resultWithBinary(i).getInt(0) === i)
+        assert(resultWithBinary(i)(1) === s"val_$i".getBytes)
+    }
+
+    TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
+    // This ParquetRelation always use Parquet types to derive output.
+    val parquetRelation = new ParquetRelation(
+      path.toString,
+      Some(TestSQLContext.sparkContext.hadoopConfiguration),
+      TestSQLContext) {
+      override val output =
+        ParquetTypesConverter.convertToAttributes(
+          ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema,
+          TestSQLContext.isParquetBinaryAsString)
+    }
+    val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation)
+    val resultWithString = schemaRDD.collect
+    range.foreach {
+      i =>
+        assert(resultWithString(i).getInt(0) === i)
+        assert(resultWithString(i)(1) === s"val_$i")
+    }
+
+    schemaRDD.registerTempTable("tmp")
+    checkAnswer(
+      sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"),
+      (5, "val_5") ::
+      (7, "val_7") :: Nil)
+
+    // Set it back.
+    TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
+  }
+
   test("Read/Write All Types with non-primitive type") {
     val tempDir = getTempFilePath("parquetTest").getCanonicalPath
     val range = (0 to 255)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message