spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-4985] [SQL] parquet support for date type
Date Mon, 23 Mar 2015 03:46:22 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2bf40c58e -> 4659468f3


[SPARK-4985] [SQL] parquet support for date type

This PR might have some issues with #3732 ,
and this would have merge conflicts with #3820 so the review can be delayed till that 2 were
merged.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #3822 from adrian-wang/parquetdate and squashes the following commits:

2c5d54d [Daoyuan Wang] add a test case
faef887 [Daoyuan Wang] parquet support for primitive date
97e9080 [Daoyuan Wang] parquet support for date type


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4659468f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4659468f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4659468f

Branch: refs/heads/master
Commit: 4659468f369d69e7f777130e5e3b4c5d47a624f1
Parents: 2bf40c5
Author: Daoyuan Wang <daoyuan.wang@intel.com>
Authored: Mon Mar 23 11:46:16 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Mon Mar 23 11:46:16 2015 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/ParquetConverter.scala  | 12 ++++++++++++
 .../spark/sql/parquet/ParquetTableSupport.scala      |  2 ++
 .../org/apache/spark/sql/parquet/ParquetTypes.scala  |  4 ++++
 .../apache/spark/sql/parquet/ParquetIOSuite.scala    | 15 +++++++++++++++
 .../spark/sql/parquet/ParquetSchemaSuite.scala       |  3 ++-
 5 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4659468f/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index f898e4b..43ca359 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -127,6 +127,12 @@ private[sql] object CatalystConverter {
             parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
         }
       }
+      case DateType => {
+        new CatalystPrimitiveConverter(parent, fieldIndex) {
+          override def addInt(value: Int): Unit =
+            parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType])
+        }
+      }
       case d: DecimalType => {
         new CatalystPrimitiveConverter(parent, fieldIndex) {
           override def addBinary(value: Binary): Unit =
@@ -192,6 +198,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter
{
   protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
     updateField(fieldIndex, value)
 
+  protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
+    updateField(fieldIndex, value)
+
   protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
     updateField(fieldIndex, value)
 
@@ -388,6 +397,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
   override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
     current.setInt(fieldIndex, value)
 
+  override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
+    current.update(fieldIndex, value)
+
   override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
     current.setLong(fieldIndex, value)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4659468f/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 19bfba3..5a1b154 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with
Logging {
         case DoubleType => writer.addDouble(value.asInstanceOf[Double])
         case FloatType => writer.addFloat(value.asInstanceOf[Float])
         case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
+        case DateType => writer.addInteger(value.asInstanceOf[Int])
         case d: DecimalType =>
           if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
             sys.error(s"Unsupported datatype $d, cannot write to consumer")
@@ -358,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport
{
       case DoubleType => writer.addDouble(record.getDouble(index))
       case FloatType => writer.addFloat(record.getFloat(index))
       case BooleanType => writer.addBoolean(record.getBoolean(index))
+      case DateType => writer.addInteger(record.getInt(index))
       case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
       case d: DecimalType =>
         if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4659468f/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 5209581..da668f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
       case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
       case ParquetPrimitiveTypeName.DOUBLE => DoubleType
       case ParquetPrimitiveTypeName.FLOAT => FloatType
+      case ParquetPrimitiveTypeName.INT32
+        if originalType == ParquetOriginalType.DATE => DateType
       case ParquetPrimitiveTypeName.INT32 => IntegerType
       case ParquetPrimitiveTypeName.INT64 => LongType
       case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
@@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
     // There is no type for Byte or Short so we promote them to INT32.
     case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
     case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
+    case DateType => Some(ParquetTypeInfo(
+      ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
     case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
     case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
     case DecimalType.Fixed(precision, scale) if precision <= 18 =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4659468f/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 5438095..203bc79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -135,6 +135,21 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     }
   }
 
+  test("date type") {
+    def makeDateRDD(): DataFrame =
+      sparkContext
+        .parallelize(0 to 1000)
+        .map(i => Tuple1(DateUtils.toJavaDate(i)))
+        .toDF()
+        .select($"_1")
+
+    withTempPath { dir =>
+      val data = makeDateRDD()
+      data.saveAsParquetFile(dir.getCanonicalPath)
+      checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+    }
+  }
+
   test("map") {
     val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
     checkParquetFile(data)

http://git-wip-us.apache.org/repos/asf/spark/blob/4659468f/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index ad880e2..321832c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -57,7 +57,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
       |}
     """.stripMargin)
 
-  testSchema[(Byte, Short, Int, Long)](
+  testSchema[(Byte, Short, Int, Long, java.sql.Date)](
     "logical integral types",
     """
       |message root {
@@ -65,6 +65,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
       |  required int32 _2 (INT_16);
       |  required int32 _3 (INT_32);
       |  required int64 _4 (INT_64);
+      |  optional int32 _5 (DATE);
       |}
     """.stripMargin)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message