carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [12/50] [abbrv] carbondata git commit: [CARBONDATA-2036] Fix the insert static partition with integer values prefix with 0 not working
Date Wed, 31 Jan 2018 05:22:32 GMT
[CARBONDATA-2036] Fix the insert static partition with integer values prefix with 0 not working

When trying to insert overwrite on the static partition with 0 at first on int column has
an issue.Example :create table test(d1 string) partition by (c1 int, c2 int, c3 int)And use
insert overwrite table partition(01, 02, 03) select s1The above case has a problem as 01 is
not converting to an actual integer to partition map file.Solution :Convert the partition
values to corresponding datatype value before adding to partition file.

This closes #1833


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b4dc866f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b4dc866f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b4dc866f

Branch: refs/heads/carbonstore
Commit: b4dc866fec0a42196435c6da0e1413dc2a2398d1
Parents: 937868d
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Thu Jan 18 18:51:50 2018 +0530
Committer: kumarvishal <kumarvishal.1802@gmail.com>
Committed: Sat Jan 20 00:14:24 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  38 +++-
 .../core/indexstore/UnsafeMemoryDMStore.java    |  22 ++-
 .../carbondata/core/util/CarbonProperties.java  |  52 ++---
 .../core/CarbonPropertiesValidationTest.java    |   2 +-
 ...tandardPartitionTableOverwriteTestCase.scala |  20 ++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 189 ++++++++++---------
 .../carbondata/spark/util/CarbonScalaUtil.scala |  10 +-
 .../management/CarbonLoadDataCommand.scala      |  29 ++-
 .../CarbonProjectForUpdateCommand.scala         |   2 +-
 .../datasources/CarbonFileFormat.scala          |  61 ++----
 .../apache/spark/sql/hive/CarbonMetaStore.scala |   4 +-
 .../src/main/spark2.1/CarbonSQLConf.scala       |  12 +-
 .../src/main/spark2.2/CarbonSqlConf.scala       |  12 +-
 13 files changed, 246 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index cd7abe0..13c8a42 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1299,11 +1299,6 @@ public final class CarbonCommonConstants {
 
   @CarbonProperty
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
-  public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";
-
-  @CarbonProperty
-  public static final String CARBON_COMBINE_SMALL_INPUT_FILES = "carbon.mergeSmallFileRead.enable";
-  public static final String CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT = "false";
 
   public static final int DICTIONARY_DEFAULT_CARDINALITY = 1;
   @CarbonProperty
@@ -1404,9 +1399,38 @@ public final class CarbonCommonConstants {
 
   public static final String USE_DISTRIBUTED_DATAMAP_DEFAULT = "false";
 
-  public static final String CARBON_USE_BLOCKLET_DISTRIBUTION = "carbon.blocklet.distribution";
+  /**
+   * This property defines how the tasks are splitted/combined and launch spark tasks during
query
+   */
+  @CarbonProperty
+  public static final String CARBON_TASK_DISTRIBUTION = "carbon.task.distribution";
+
+  /**
+   * It combines the available blocks as per the maximum available tasks in the cluster.
+   */
+  public static final String CARBON_TASK_DISTRIBUTION_CUSTOM = "custom";
+
+  /**
+   * It creates the splits as per the number of blocks/carbondata files available for query.
+   */
+  public static final String CARBON_TASK_DISTRIBUTION_BLOCK = "block";
+
+  /**
+   * It creates the splits as per the number of blocklets available for query.
+   */
+  public static final String CARBON_TASK_DISTRIBUTION_BLOCKLET = "blocklet";
+
+  /**
+   * It merges all the small files and create tasks as per the configurable partition size.
+   */
+  public static final String CARBON_TASK_DISTRIBUTION_MERGE_FILES = "merge_small_files";
+
+  /**
+   * Default task distribution.
+   */
+  public static final String CARBON_TASK_DISTRIBUTION_DEFAULT = CARBON_TASK_DISTRIBUTION_BLOCK;
+
 
-  public static final String CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT = "true";
   /**
    * The property to configure the mdt file folder path, earlier it was pointing to the
    * fixed carbon store path. This is needed in case of the federation setup when user removes

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index dc630ff..31ecac2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -36,7 +36,7 @@ public class UnsafeMemoryDMStore {
 
   private MemoryBlock memoryBlock;
 
-  private static int capacity = 8 * 1024 * 1024;
+  private static int capacity = 8 * 1024;
 
   private int allocatedSize;
 
@@ -66,14 +66,8 @@ public class UnsafeMemoryDMStore {
    * @param rowSize
    */
   private void ensureSize(int rowSize) throws MemoryException {
-    if (runningLength + rowSize >= allocatedSize) {
-      MemoryBlock allocate =
-          UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity);
-      getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
-          allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
-      allocatedSize = allocatedSize + capacity;
-      memoryBlock = allocate;
+    while (runningLength + rowSize >= allocatedSize) {
+      increaseMemory();
     }
     if (this.pointers.length <= rowCount + 1) {
       int[] newPointer = new int[pointers.length + 1000];
@@ -82,6 +76,16 @@ public class UnsafeMemoryDMStore {
     }
   }
 
+  private void increaseMemory() throws MemoryException {
+    MemoryBlock allocate =
+        UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + capacity);
+    getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
+        allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
+    UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock);
+    allocatedSize = allocatedSize + capacity;
+    memoryBlock = allocate;
+  }
+
   /**
    * Add the index row to unsafe.
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index a918611..fd78efc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -35,25 +35,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.BLOCKLET_SIZE;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATA_FILE_VERSION;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATE_FORMAT;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_VECTOR_READER;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.HANDOFF_SIZE;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.LOCK_TYPE;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES_BLOCK_SORT;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.*;
 import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB;
 import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO;
 
@@ -151,8 +133,8 @@ public final class CarbonProperties {
       case HANDOFF_SIZE:
         validateHandoffSize();
         break;
-      case CARBON_COMBINE_SMALL_INPUT_FILES:
-        validateCombineSmallInputFiles();
+      case CARBON_TASK_DISTRIBUTION:
+        validateCarbonTaskDistribution();
         break;
       // The method validate the validity of configured carbon.timestamp.format value
       // and reset to default value if validation fail
@@ -199,7 +181,7 @@ public final class CarbonProperties {
     validateLockType();
     validateCarbonCSVReadBufferSizeByte();
     validateHandoffSize();
-    validateCombineSmallInputFiles();
+    validateCarbonTaskDistribution();
     // The method validate the validity of configured carbon.timestamp.format value
     // and reset to default value if validation fail
     validateTimeFormatKey(CARBON_TIMESTAMP_FORMAT,
@@ -361,22 +343,24 @@ public final class CarbonProperties {
     if (!isValidBooleanValue) {
       LOGGER.warn("The custom block distribution value \"" + customBlockDistributionStr
           + "\" is invalid. Using the default value \""
-          + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT);
-      carbonProperties.setProperty(CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT);
+          + false);
+      carbonProperties.setProperty(CARBON_CUSTOM_BLOCK_DISTRIBUTION, "false");
     }
   }
 
-  private void validateCombineSmallInputFiles() {
-    String combineSmallInputFilesStr =
-        carbonProperties.getProperty(CARBON_COMBINE_SMALL_INPUT_FILES);
-    boolean isValidBooleanValue = CarbonUtil.validateBoolean(combineSmallInputFilesStr);
-    if (!isValidBooleanValue) {
-      LOGGER.warn("The combine small files value \"" + combineSmallInputFilesStr
+  private void validateCarbonTaskDistribution() {
+    String carbonTaskDistribution = carbonProperties.getProperty(CARBON_TASK_DISTRIBUTION);
+    boolean isValid = carbonTaskDistribution != null && (
+        carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_MERGE_FILES)
+            || carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_BLOCKLET)
+            || carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_BLOCK)
+            || carbonTaskDistribution.equalsIgnoreCase(CARBON_TASK_DISTRIBUTION_CUSTOM));
+    if (!isValid) {
+      LOGGER.warn("The carbon task distribution value \"" + carbonTaskDistribution
           + "\" is invalid. Using the default value \""
-          + CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT);
-      carbonProperties.setProperty(CARBON_COMBINE_SMALL_INPUT_FILES,
-          CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT);
+          + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT);
+      carbonProperties.setProperty(CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
index ae29b03..daf6db0 100644
--- a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
@@ -77,7 +77,7 @@ public class CarbonPropertiesValidationTest extends TestCase {
     String valueAfterValidation =
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION);
     assertTrue(valueBeforeValidation.equals(valueAfterValidation));
-    assertTrue(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT
+    assertTrue("false"
         .equalsIgnoreCase(valueAfterValidation));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
index 15126b6..4104ea3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
@@ -155,6 +155,25 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with
BeforeAndAf
     checkAnswer(sql("select count(*) from weather6"), Seq(Row(2)))
   }
 
+  test("Test overwrite static partition with wrong int value") {
+    sql(
+      """
+        | CREATE TABLE weather7 (type String)
+        | PARTITIONED BY (year int, month int, day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql("insert into weather7 partition(year=2014, month=05, day=25) select 'rainy'")
+    sql("insert into weather7 partition(year=2014, month=04, day=23) select 'cloudy'")
+    sql("insert overwrite table weather7 partition(year=2014, month=05, day=25) select 'sunny'")
+    checkExistence(sql("select * from weather7"), true, "sunny")
+    checkAnswer(sql("select count(*) from weather7"), Seq(Row(2)))
+    sql("insert into weather7 partition(year=2014, month, day) select 'rainy1',06,25")
+    sql("insert into weather7 partition(year=2014, month=01, day) select 'rainy2',27")
+    sql("insert into weather7 partition(year=2014, month=01, day=02) select 'rainy3'")
+    checkAnswer(sql("select count(*) from weather7 where month=1"), Seq(Row(2)))
+  }
+
 
   override def afterAll = {
     dropTable
@@ -168,6 +187,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf
     sql("drop table if exists insertstaticpartitiondynamic")
     sql("drop table if exists partitionallcompaction")
     sql("drop table if exists weather6")
+    sql("drop table if exists weather7")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index cc68b9c..a04e9e1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -145,7 +145,9 @@ class CarbonScanRDD(
       statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
       statisticRecorder.recordStatisticsForDriver(statistic, queryId)
       statistic = new QueryStatistic()
-
+      val carbonDistribution = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
       // If bucketing is enabled on table then partitions should be grouped based on buckets.
       if (bucketedTable != null) {
         var i = 0
@@ -161,101 +163,106 @@ class CarbonScanRDD(
           i += 1
           result.add(partition)
         }
-      } else if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) {
-        // create a list of block based on split
-        val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
-
-        // get the list of executors and map blocks to executors based on locality
-        val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
-
-        // divide the blocks among the tasks of the nodes as per the data locality
-        val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
-          parallelism, activeNodes.toList.asJava)
-        var i = 0
-        // Create Spark Partition for each task and assign blocks
-        nodeBlockMapping.asScala.foreach { case (node, blockList) =>
-          blockList.asScala.foreach { blocksPerTask =>
-            val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
-            if (blocksPerTask.size() != 0) {
-              val multiBlockSplit =
-                new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node))
-              val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
-              result.add(partition)
-              i += 1
+      } else {
+        val useCustomDistribution =
+          CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+            "false").toBoolean ||
+          carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM)
+        if (useCustomDistribution) {
+          // create a list of block based on split
+          val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
+
+          // get the list of executors and map blocks to executors based on locality
+          val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+
+          // divide the blocks among the tasks of the nodes as per the data locality
+          val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava,
-1,
+            parallelism, activeNodes.toList.asJava)
+          var i = 0
+          // Create Spark Partition for each task and assign blocks
+          nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+            blockList.asScala.foreach { blocksPerTask =>
+              val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+              if (blocksPerTask.size() != 0) {
+                val multiBlockSplit =
+                  new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node))
+                val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
+                result.add(partition)
+                i += 1
+              }
             }
           }
-        }
-        noOfNodes = nodeBlockMapping.size
-      } else if (CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION,
-        CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) {
-        // Use blocklet distribution
-        // Randomize the blocklets for better shuffling
-        Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
-          val multiBlockSplit =
-            new CarbonMultiBlockSplit(identifier,
-              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
-              splitWithIndex._1.getLocations)
-          val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
-          result.add(partition)
-        }
-      } else if (CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES,
-        CarbonCommonConstants.CARBON_COMBINE_SMALL_INPUT_FILES_DEFAULT).toBoolean) {
-
-        // sort blocks in reverse order of length
-        val blockSplits = splits
-          .asScala
-          .map(_.asInstanceOf[CarbonInputSplit])
-          .groupBy(f => f.getBlockPath)
-          .map { blockSplitEntry =>
-            new CarbonMultiBlockSplit(identifier,
-              blockSplitEntry._2.asJava,
-              blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray)
-          }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)
-
-        val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
-        val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
-        val defaultParallelism = spark.sparkContext.defaultParallelism
-        val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum
-        val bytesPerCore = totalBytes / defaultParallelism
-
-        val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
-        LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
-                    s"open cost is considered as scanning $openCostInBytes bytes.")
-
-        val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit]
-        var currentSize = 0L
-
-        def closePartition(): Unit = {
-          if (currentFiles.nonEmpty) {
-            result.add(combineSplits(currentFiles, currentSize, result.size()))
+          noOfNodes = nodeBlockMapping.size
+        } else if (carbonDistribution.equalsIgnoreCase(
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET)) {
+          // Use blocklet distribution
+          // Randomize the blocklets for better shuffling
+          Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
+            val multiBlockSplit =
+              new CarbonMultiBlockSplit(identifier,
+                Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+                splitWithIndex._1.getLocations)
+            val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
+            result.add(partition)
+          }
+        } else if (carbonDistribution.equalsIgnoreCase(
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) {
+
+          // sort blocks in reverse order of length
+          val blockSplits = splits
+            .asScala
+            .map(_.asInstanceOf[CarbonInputSplit])
+            .groupBy(f => f.getBlockPath)
+            .map { blockSplitEntry =>
+              new CarbonMultiBlockSplit(identifier,
+                blockSplitEntry._2.asJava,
+                blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray)
+            }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)
+
+          val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
+          val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
+          val defaultParallelism = spark.sparkContext.defaultParallelism
+          val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum
+          val bytesPerCore = totalBytes / defaultParallelism
+
+          val maxSplitBytes = Math
+            .min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+          LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, "
+
+                      s"open cost is considered as scanning $openCostInBytes bytes.")
+
+          val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit]
+          var currentSize = 0L
+
+          def closePartition(): Unit = {
+            if (currentFiles.nonEmpty) {
+              result.add(combineSplits(currentFiles, currentSize, result.size()))
+            }
+            currentFiles.clear()
+            currentSize = 0
           }
-          currentFiles.clear()
-          currentSize = 0
-        }
 
-        blockSplits.foreach { file =>
-          if (currentSize + file.getLength > maxSplitBytes) {
-            closePartition()
+          blockSplits.foreach { file =>
+            if (currentSize + file.getLength > maxSplitBytes) {
+              closePartition()
+            }
+            // Add the given file to the current partition.
+            currentSize += file.getLength + openCostInBytes
+            currentFiles += file
+          }
+          closePartition()
+        } else {
+          // Use block distribution
+          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy { f =>
+            f.getSegmentId.concat(f.getBlockPath)
+          }.values.zipWithIndex.foreach { splitWithIndex =>
+            val multiBlockSplit =
+              new CarbonMultiBlockSplit(identifier,
+                splitWithIndex._1.asJava,
+                splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
+            val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
+            result.add(partition)
           }
-          // Add the given file to the current partition.
-          currentSize += file.getLength + openCostInBytes
-          currentFiles += file
-        }
-        closePartition()
-      } else {
-        // Use block distribution
-        splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-          .groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { splitWithIndex =>
-          val multiBlockSplit =
-            new CarbonMultiBlockSplit(identifier,
-              splitWithIndex._1.asJava,
-              splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray)
-          val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
-          result.add(partition)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 6c8a6b0..86d25b4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -177,9 +177,9 @@ object CarbonScalaUtil {
       return null
     }
     dataType match {
-      case TimestampType =>
+      case TimestampType if timeStampFormat != null =>
         DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000)
-      case DateType =>
+      case DateType if dateFormat != null =>
         DateTimeUtils.dateToString(
           (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt)
       case ShortType => value.toShort.toString
@@ -233,16 +233,18 @@ object CarbonScalaUtil {
       serializationNullFormat: String,
       badRecordAction: String,
       isEmptyBadRecord: Boolean): Map[String, String] = {
+    val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__"
     partitionSpec.map{ case (col, pvalue) =>
       // replace special string with empty value.
-      val value = if (pvalue.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+      val value = if (pvalue == null) {
+        hivedefaultpartition
+      } else if (pvalue.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
         ""
       } else {
         pvalue
       }
       val carbonColumn = table.getColumnByName(table.getTableName, col.toLowerCase)
       val dataType = CarbonScalaUtil.convertCarbonToSparkDataType(carbonColumn.getDataType)
-      val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__"
       try {
         if (isEmptyBadRecord && value.length == 0 &&
             badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString) &&

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 9577615..6b43152 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -521,9 +521,10 @@ case class CarbonLoadDataCommand(
     CarbonSession.threadSet(
       CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
       badRecordAction)
+    val isEmptyBadRecord = carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)
     CarbonSession.threadSet(
       CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+      isEmptyBadRecord)
     try {
       val query: LogicalPlan = if (dataFrame.isDefined) {
         val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
@@ -632,7 +633,13 @@ case class CarbonLoadDataCommand(
           overwrite = false,
           ifPartitionNotExists = false)
       if (isOverwriteTable && partition.nonEmpty) {
-        overwritePartition(sparkSession, table, convertedPlan)
+        overwritePartition(
+          sparkSession,
+          table,
+          convertedPlan,
+          serializationNullFormat,
+          badRecordAction,
+          isEmptyBadRecord.toBoolean)
       } else {
         Dataset.ofRows(sparkSession, convertedPlan)
       }
@@ -754,11 +761,25 @@ case class CarbonLoadDataCommand(
   private def overwritePartition(
       sparkSession: SparkSession,
       table: CarbonTable,
-      logicalPlan: LogicalPlan): Unit = {
+      logicalPlan: LogicalPlan,
+      serializationNullFormat: String,
+      badRecordAction: String,
+      isEmptyBadRecord: Boolean): Unit = {
     val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
+
+    // Update the partitions as per the datatype expect for time and datetype as we
+    // expect user provides the format in standard spark/hive formats.
+    val updatedPartitions = CarbonScalaUtil.updatePartitions(
+      partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)),
+      table,
+      timeFormat = null,
+      dateFormat = null,
+      serializationNullFormat,
+      badRecordAction,
+      isEmptyBadRecord)
     val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
       identifier,
-      Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get))))
+      Some(updatedPartitions))
     val partitionNames = existingPartitions.toList.flatMap { partition =>
       partition.spec.seq.map{case (column, value) => column + "=" + value}
     }.toSet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 20a6bab..2f12bef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -143,7 +143,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
 
       case e: Exception =>
-        LOGGER.error("Exception in update operation" + e)
+        LOGGER.error(e, "Exception in update operation")
         // ****** start clean up.
         // In case of failure , clean all related delete delta files
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index c43a204..d74e461 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -245,7 +245,7 @@ private class CarbonOutputWriter(path: String,
       null
     }
   }
-  lazy val partitionData = if (partitions.nonEmpty) {
+  lazy val (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
     val updatedPartitions = partitions.map{ p =>
       val value = p.substring(p.indexOf("=") + 1, p.length)
       val col = p.substring(0, p.indexOf("="))
@@ -273,24 +273,25 @@ private class CarbonOutputWriter(path: String,
         dateFormatString = loadModel.getDefaultDateFormat
       }
       val dateFormat = new SimpleDateFormat(dateFormatString)
-      updatedPartitions.map {case (col, value) =>
+      val formattedPartitions = updatedPartitions.map {case (col, value) =>
         // Only convert the static partitions to the carbon format and use it while loading
data
         // to carbon.
         if (staticPartition.getOrDefault(col, false)) {
-          CarbonScalaUtil.convertToCarbonFormat(value,
+          (col, CarbonScalaUtil.convertToCarbonFormat(value,
             CarbonScalaUtil.convertCarbonToSparkDataType(
               table.getColumnByName(table.getTableName, col).getDataType),
             timeFormat,
-            dateFormat)
+            dateFormat))
         } else {
-          value
+          (col, value)
         }
       }
+      (formattedPartitions, formattedPartitions.map(_._2))
     } else {
-      updatedPartitions.map(_._2)
+      (updatedPartitions, updatedPartitions.map(_._2))
     }
   } else {
-    Array.empty
+    (Map.empty[String, String].toArray, Array.empty)
   }
   val writable = new StringArrayWritable()
 
@@ -346,41 +347,17 @@ private class CarbonOutputWriter(path: String,
     val isEmptyBadRecord = loadModel.getIsEmptyDataBadRecord.split(",")(1).toBoolean
     // write partition info to new file.
     val partitonList = new util.ArrayList[String]()
-    val splitPartitions = partitions.map{ p =>
-      val value = p.substring(p.indexOf("=") + 1, p.length)
-      val col = p.substring(0, p.indexOf("="))
-      (col, value)
-    }.toMap
-    val updatedPartitions =
-      if (staticPartition != null) {
-        // There can be scnerio like dynamic and static combination, in that case we should
convert
-        // only the dyanamic partition values to the proper format and store to carbon parttion
map
-        splitPartitions.map { case (col, value) =>
-          if (!staticPartition.getOrDefault(col, false)) {
-            CarbonScalaUtil.updatePartitions(
-              Seq((col, value)).toMap,
-              table,
-              timeFormat,
-              dateFormat,
-              serializeFormat,
-              badRecordAction,
-              isEmptyBadRecord).toSeq.head
-          } else {
-            (col, value)
-          }
-        }
-      } else {
-        // All dynamic partitions need to be converted to proper format
-        CarbonScalaUtil.updatePartitions(
-          splitPartitions,
-          table,
-          timeFormat,
-          dateFormat,
-          serializeFormat,
-          badRecordAction,
-          isEmptyBadRecord)
-      }
-    updatedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
+    val formattedPartitions =
+    // All dynamic partitions need to be converted to proper format
+      CarbonScalaUtil.updatePartitions(
+        updatedPartitions.toMap,
+        table,
+        timeFormat,
+        dateFormat,
+        serializeFormat,
+        badRecordAction,
+        isEmptyBadRecord)
+    formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
     new PartitionMapFileStore().writePartitionMapFile(
       segmentPath,
       loadModel.getTaskNo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index eb59184..93c7c09 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -182,10 +182,10 @@ object CarbonMetaStoreFactory {
   def createCarbonMetaStore(conf: RuntimeConfig): CarbonMetaStore = {
     val readSchemaFromHiveMetaStore = readSchemaFromHive(conf)
     if (readSchemaFromHiveMetaStore) {
-      LOGGER.info("Hive based carbon metastore is enabled")
+      LOGGER.audit("Hive based carbon metastore is enabled")
       new CarbonHiveMetaStore()
     } else {
-      LOGGER.info("File based carbon metastore is enabled")
+      LOGGER.audit("File based carbon metastore is enabled")
       new CarbonFileMetastore()
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
index 837b21f..15ccb0c 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
@@ -42,11 +42,11 @@ class CarbonSQLConf(sparkSession: SparkSession) {
           CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
     val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
       SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To enable/ disable carbon custom block distribution.")
-        .booleanConf
+        .doc("To set carbon task distribution.")
+        .stringConf
         .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
     val BAD_RECORDS_LOGGER_ENABLE =
       SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
         .doc("To enable/ disable carbon bad record logger.")
@@ -117,8 +117,8 @@ class CarbonSQLConf(sparkSession: SparkSession) {
         CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
     sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
       carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
     sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
       CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
     sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b4dc866f/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
index eef6604..2128ffd 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
@@ -41,11 +41,11 @@ class CarbonSQLConf(sparkSession: SparkSession) {
           CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
     val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
       buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
-        .doc("To enable/ disable carbon custom block distribution.")
-        .booleanConf
+        .doc("To set carbon task distribution.")
+        .stringConf
         .createWithDefault(carbonProperties
-          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+          .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
     val BAD_RECORDS_LOGGER_ENABLE =
       buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
         .doc("To enable/ disable carbon bad record logger.")
@@ -116,8 +116,8 @@ class CarbonSQLConf(sparkSession: SparkSession) {
         CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
     sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
       carbonProperties
-        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
-          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+        .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
     sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
       CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
     sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,


Mime
View raw message