carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [35/56] [abbrv] carbondata git commit: fix timestam format issue for partitioned table
Date Tue, 20 Jun 2017 07:29:41 GMT
fix timestam format issue for partitioned table


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/71934e91
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/71934e91
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/71934e91

Branch: refs/heads/streaming_ingest
Commit: 71934e915578b2022399d319fbecb50b2ee4fe35
Parents: 48cd1f6
Author: QiangCai <qiangcai@qq.com>
Authored: Thu Jun 15 21:02:29 2017 +0800
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Thu Jun 15 19:30:19 2017 +0530

----------------------------------------------------------------------
 .../filter/partition/PartitionFilterUtil.java   | 21 +++++++++-----
 .../scan/filter/partition/RangeFilterImpl.java  | 17 +++++++++--
 .../core/scan/partition/ListPartitioner.java    | 14 ++++++++-
 .../core/scan/partition/PartitionUtil.java      | 30 +++-----------------
 .../core/scan/partition/RangePartitioner.java   | 15 +++++++++-
 .../TestAllDataTypeForPartitionTable.scala      |  8 ++----
 .../partition/TestDDLForPartitionTable.scala    |  7 +++++
 .../TestDataLoadingForPartitionTable.scala      | 14 ++-------
 .../partition/TestQueryForPartitionTable.scala  |  3 ++
 .../spark/rdd/CarbonDataRDDFactory.scala        | 26 +++++++++++++----
 .../spark/rdd/CarbonDataRDDFactory.scala        | 26 +++++++++++++----
 11 files changed, 116 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
index e49a39a..1ab2ae6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.scan.filter.partition;
 
 import java.math.BigDecimal;
+import java.text.DateFormat;
 import java.util.BitSet;
 import java.util.Comparator;
 import java.util.List;
@@ -116,7 +117,8 @@ public class PartitionFilterUtil {
    * @return
    */
   public static BitSet getPartitionMapForRangeFilter(PartitionInfo partitionInfo,
-      ListPartitioner partitioner, Object filterValue,  boolean isGreaterThan, boolean isEqualTo)
{
+      ListPartitioner partitioner, Object filterValue,  boolean isGreaterThan, boolean isEqualTo,
+      DateFormat timestampFormatter, DateFormat dateFormatter) {
 
     List<List<String>> values = partitionInfo.getListInfo();
     DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType();
@@ -135,7 +137,8 @@ public class PartitionFilterUtil {
         outer1:
         for (int i = 0; i < partitions; i++) {
           for (String value : values.get(i)) {
-            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType);
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType,
+                timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) >= 0) {
               partitionMap.set(i);
               continue outer1;
@@ -147,7 +150,8 @@ public class PartitionFilterUtil {
         outer2:
         for (int i = 0; i < partitions; i++) {
           for (String value : values.get(i)) {
-            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType);
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType,
+                timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) > 0) {
               partitionMap.set(i);
               continue outer2;
@@ -161,7 +165,8 @@ public class PartitionFilterUtil {
         outer3:
         for (int i = 0; i < partitions; i++) {
           for (String value : values.get(i)) {
-            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType);
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType,
+                timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) <= 0) {
               partitionMap.set(i);
               continue outer3;
@@ -173,7 +178,8 @@ public class PartitionFilterUtil {
         outer4:
         for (int i = 0; i < partitions; i++) {
           for (String value : values.get(i)) {
-            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType);
+            Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType,
+                timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) < 0) {
               partitionMap.set(i);
               continue outer4;
@@ -196,7 +202,8 @@ public class PartitionFilterUtil {
    * @return
    */
   public static BitSet getPartitionMapForRangeFilter(PartitionInfo partitionInfo,
-      RangePartitioner partitioner, Object filterValue, boolean isGreaterThan, boolean isEqualTo)
{
+      RangePartitioner partitioner, Object filterValue, boolean isGreaterThan, boolean isEqualTo,
+      DateFormat timestampFormatter, DateFormat dateFormatter) {
 
     List<String> values = partitionInfo.getRangeInfo();
     DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType();
@@ -213,7 +220,7 @@ public class PartitionFilterUtil {
     // find the partition of filter value
     for (; partitionIndex < numPartitions; partitionIndex++) {
       result = comparator.compare(filterValue, PartitionUtil.getDataBasedOnDataType(
-          values.get(partitionIndex), partitionColumnDataType));
+          values.get(partitionIndex), partitionColumnDataType, timestampFormatter, dateFormatter));
       if (result <= 0) {
         break;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
index 697be32..0124d2b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/RangeFilterImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.carbondata.core.scan.filter.partition;
 
+import java.text.SimpleDateFormat;
 import java.util.BitSet;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.partition.ListPartitioner;
@@ -26,6 +28,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil;
 import org.apache.carbondata.core.scan.partition.Partitioner;
 import org.apache.carbondata.core.scan.partition.RangePartitioner;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * the implement of Range filter(include <=, <, >=, >)
@@ -47,13 +50,22 @@ public class RangeFilterImpl implements PartitionFilterIntf {
 
   @Override public BitSet applyFilter(Partitioner partitioner) {
 
+    SimpleDateFormat timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+    SimpleDateFormat dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
     switch (partitionInfo.getPartitionType()) {
       case LIST:
         Object filterValueOfList = PartitionUtil.getDataBasedOnDataTypeForFilter(
             literal.getLiteralExpValue().toString(),
             partitionInfo.getColumnSchemaList().get(0).getDataType());
         return PartitionFilterUtil.getPartitionMapForRangeFilter(partitionInfo,
-            (ListPartitioner) partitioner, filterValueOfList, isGreaterThan, isEqualTo);
+            (ListPartitioner) partitioner, filterValueOfList, isGreaterThan, isEqualTo,
+            timestampFormatter, dateFormatter);
       case RANGE:
         Object filterValueOfRange = PartitionUtil.getDataBasedOnDataTypeForFilter(
             literal.getLiteralExpValue().toString(),
@@ -62,7 +74,8 @@ public class RangeFilterImpl implements PartitionFilterIntf {
           filterValueOfRange = ByteUtil.toBytes((String)filterValueOfRange);
         }
         return PartitionFilterUtil.getPartitionMapForRangeFilter(partitionInfo,
-            (RangePartitioner) partitioner, filterValueOfRange, isGreaterThan, isEqualTo);
+            (RangePartitioner) partitioner, filterValueOfRange, isGreaterThan, isEqualTo,
+            timestampFormatter, dateFormatter);
       default:
         return PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), true);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
index bab2ede..a2004bf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
@@ -17,17 +17,28 @@
 
 package org.apache.carbondata.core.scan.partition;
 
+import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * List Partitioner
  */
 public class ListPartitioner implements Partitioner {
 
+  private SimpleDateFormat timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+  private SimpleDateFormat dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+          CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
   /**
    * map the value of ListPartition to partition id.
    */
@@ -41,7 +52,8 @@ public class ListPartitioner implements Partitioner {
     numPartitions = values.size();
     for (int i = 0; i < numPartitions; i++) {
       for (String value : values.get(i)) {
-        map.put(PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType), i);
+        map.put(PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType,
+            timestampFormatter, dateFormatter), i);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
index aab356c..5921326 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
@@ -19,38 +19,15 @@ package org.apache.carbondata.core.scan.partition;
 
 import java.math.BigDecimal;
 import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.BitSet;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
 
 import org.apache.commons.lang.StringUtils;
 
 public class PartitionUtil {
 
-  private static LogService LOGGER = LogServiceFactory.getLogService(PartitionUtil.class.getName());
-
-  private static final ThreadLocal<DateFormat> timestampFormatter = new ThreadLocal<DateFormat>()
{
-    @Override protected DateFormat initialValue() {
-      return new SimpleDateFormat(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-              CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
-    }
-  };
-
-  private static final ThreadLocal<DateFormat> dateFormatter = new ThreadLocal<DateFormat>()
{
-    @Override protected DateFormat initialValue() {
-      return new SimpleDateFormat(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
-              CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
-    }
-  };
-
   public static Partitioner getPartitioner(PartitionInfo partitionInfo) {
     switch (partitionInfo.getPartitionType()) {
       case HASH:
@@ -65,7 +42,8 @@ public class PartitionUtil {
     }
   }
 
-  public static Object getDataBasedOnDataType(String data, DataType actualDataType) {
+  public static Object getDataBasedOnDataType(String data, DataType actualDataType,
+      DateFormat timestampFormatter, DateFormat dateFormatter) {
     if (data == null) {
       return null;
     }
@@ -85,9 +63,9 @@ public class PartitionUtil {
         case LONG:
           return Long.parseLong(data);
         case DATE:
-          return PartitionUtil.dateFormatter.get().parse(data).getTime();
+          return dateFormatter.parse(data).getTime();
         case TIMESTAMP:
-          return PartitionUtil.timestampFormatter.get().parse(data).getTime();
+          return timestampFormatter.parse(data).getTime();
         case DECIMAL:
           return new BigDecimal(data);
         default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
index c73f0b5..3c02736 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
@@ -19,11 +19,14 @@ package org.apache.carbondata.core.scan.partition;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * Range Partitioner
@@ -34,6 +37,14 @@ public class RangePartitioner implements Partitioner {
   private Object[] bounds;
   private SerializableComparator comparator;
 
+  private SimpleDateFormat timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+  private SimpleDateFormat dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+          CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
   public RangePartitioner(PartitionInfo partitionInfo) {
     List<String> values = partitionInfo.getRangeInfo();
     DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType();
@@ -45,7 +56,8 @@ public class RangePartitioner implements Partitioner {
       }
     } else {
       for (int i = 0; i < numPartitions; i++) {
-        bounds[i] = PartitionUtil.getDataBasedOnDataType(values.get(i), partitionColumnDataType);
+        bounds[i] = PartitionUtil.getDataBasedOnDataType(values.get(i), partitionColumnDataType,
+            timestampFormatter, dateFormatter);
       }
     }
 
@@ -75,6 +87,7 @@ public class RangePartitioner implements Partitioner {
   /**
    * number of partitions
    * add extra default partition
+   *
    * @return
    */
   @Override public int numPartitions() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
index 1b69eda..40384ff 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
@@ -23,13 +23,11 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
-
 import scala.collection.mutable
 
-class TestAllDataTypeForPartitionTable extends QueryTest with BeforeAndAfterAll {
+import org.apache.spark.sql.test.TestQueryExecutor
 
-  val defaultTimestampFormat = CarbonProperties.getInstance()
-    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+class TestAllDataTypeForPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
     CarbonProperties.getInstance()
@@ -41,7 +39,7 @@ class TestAllDataTypeForPartitionTable extends QueryTest with BeforeAndAfterAll
 
   override def afterAll = {
     CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, TestQueryExecutor.timestampFormat)
       .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
 
     dropTable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index 6bd66ad..6aa259e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -18,17 +18,22 @@
 package org.apache.carbondata.spark.testsuite.partition
 
 import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.util.CarbonProperties
 
 class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll = {
     dropTable
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }
 
   test("create partition table: hash partition") {
@@ -132,6 +137,8 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll
{
 
   override def afterAll = {
     dropTable
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, TestQueryExecutor.timestampFormat)
   }
 
   def dropTable = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index b5858b4..b1f4ba4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -17,6 +17,7 @@
 package org.apache.carbondata.spark.testsuite.partition
 
 import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
@@ -28,9 +29,6 @@ import org.apache.spark.sql.Row
 
 class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
 
-  val defaultTimestampFormat = CarbonProperties.getInstance()
-    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
-
   override def beforeAll {
     dropTable
 
@@ -293,14 +291,8 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
 
   override def afterAll = {
     dropTable
-    if (defaultTimestampFormat == null) {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    } else {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, defaultTimestampFormat)
-    }
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, TestQueryExecutor.timestampFormat)
   }
 
   def dropTable = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
index 57861a9..66d98f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestQueryForPartitionTable.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.testsuite.partition
 
 import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -141,6 +142,8 @@ class TestQueryForPartitionTable  extends QueryTest with BeforeAndAfterAll
{
 
   override def afterAll = {
     dropTable
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, TestQueryExecutor.timestampFormat)
   }
 
   def dropTable = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3dcf8af..65235e6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1012,15 +1012,28 @@ object CarbonDataRDDFactory {
     if (partitionColumnIndex == -1) {
       throw new DataLoadingException("Partition column not found.")
     }
-    // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
-    val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
-      // input data from DataFrame
+
+    val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat())
+    val specificFormat = Option(dateFormatMap.get(partitionColumn.toLowerCase))
+    val timeStampFormat = if (specificFormat.isDefined) {
+      new SimpleDateFormat(specificFormat.get)
+    } else {
       val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-      val timeStampFormat = new SimpleDateFormat(timestampFormatString)
+      new SimpleDateFormat(timestampFormatString)
+    }
+
+    val dateFormat = if (specificFormat.isDefined) {
+      new SimpleDateFormat(specificFormat.get)
+    } else {
       val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
-      val dateFormat = new SimpleDateFormat(dateFormatString)
+      new SimpleDateFormat(dateFormatString)
+    }
+
+    // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
+    val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
+      // input data from DataFrame
       val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
       val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
       val serializationNullFormat =
@@ -1074,7 +1087,8 @@ object CarbonDataRDDFactory {
       }
     } else {
       inputRDD.map { row =>
-        (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType), row._2)
+        (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType, timeStampFormat,
+          dateFormat), row._2)
       }
         .partitionBy(partitioner)
         .map(_._2)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/71934e91/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 271b56b..48af516 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1036,15 +1036,28 @@ object CarbonDataRDDFactory {
     if (partitionColumnIndex == -1) {
       throw new DataLoadingException("Partition column not found.")
     }
-    // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
-    val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
-      // input data from DataFrame
+
+    val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat())
+    val specificFormat = Option(dateFormatMap.get(partitionColumn.toLowerCase))
+    val timeStampFormat = if (specificFormat.isDefined) {
+      new SimpleDateFormat(specificFormat.get)
+    } else {
       val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-      val timeStampFormat = new SimpleDateFormat(timestampFormatString)
+      new SimpleDateFormat(timestampFormatString)
+    }
+
+    val dateFormat = if (specificFormat.isDefined) {
+      new SimpleDateFormat(specificFormat.get)
+    } else {
       val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
-      val dateFormat = new SimpleDateFormat(dateFormatString)
+      new SimpleDateFormat(dateFormatString)
+    }
+
+    // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
+    val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
+      // input data from DataFrame
       val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
       val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
       val serializationNullFormat =
@@ -1098,7 +1111,8 @@ object CarbonDataRDDFactory {
       }
     } else {
       inputRDD.map { row =>
-        (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType), row._2)
+        (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType, timeStampFormat,
+          dateFormat), row._2)
       }
         .partitionBy(partitioner)
         .map(_._2)


Mime
View raw message