carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [3/4] incubator-carbondata git commit: Improved spark module code. * Removed some compliation warnings. * Replace pattern matching for boolean to IF-ELSE. * Improved code according to scala standards. * Removed unnecessary new lines. * Added string inter
Date Sat, 19 Nov 2016 02:16:52 GMT
Improved spark module code.
* Removed some compliation warnings.
* Replace pattern matching for boolean to IF-ELSE.
* Improved code according to scala standards.
* Removed unnecessary new lines.
* Added string interpolation instead of string concatenation.
* Removed unnecessary semi-colons.
* Fixed indentation.
* add useKettle option for loading
* Fixed indentation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6391c2be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6391c2be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6391c2be

Branch: refs/heads/master
Commit: 6391c2be31f347688a3dbe9f9657e3dd75158684
Parents: c5176f3
Author: Prabhat Kashyap <prabhat.kashyap@knoldus.in>
Authored: Wed Oct 19 22:24:47 2016 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sat Nov 19 09:51:04 2016 +0800

----------------------------------------------------------------------
 .../examples/AllDictionaryExample.scala         |   2 +
 .../carbondata/examples/CarbonExample.scala     |   2 +
 .../spark/sql/common/util/QueryTest.scala       |   6 +-
 .../spark/CarbonDataFrameWriter.scala           |   4 +-
 .../apache/carbondata/spark/CarbonFilters.scala |  34 +-
 .../spark/rdd/CarbonCleanFilesRDD.scala         |   2 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           | 267 ++++----
 .../spark/rdd/CarbonDataRDDFactory.scala        | 619 +++++++++----------
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   2 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |   2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 101 +--
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  91 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  57 +-
 .../apache/carbondata/spark/rdd/Compactor.scala |  71 +--
 .../spark/tasks/DictionaryWriterTask.scala      |  10 +-
 .../spark/thriftserver/CarbonThriftServer.scala |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |  14 +-
 .../spark/util/DataTypeConverterUtil.scala      |   2 +-
 .../spark/util/GlobalDictionaryUtil.scala       | 212 +++----
 .../org/apache/spark/sql/CarbonContext.scala    |  22 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  17 +-
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 347 ++++++-----
 .../spark/sql/SparkUnknownExpression.scala      |  53 +-
 .../execution/command/carbonTableSchema.scala   | 270 ++++----
 .../spark/sql/hive/CarbonMetastoreCatalog.scala |  65 +-
 .../spark/sql/hive/DistributionUtil.scala       |  35 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  13 +-
 .../scala/org/apache/spark/util/FileUtils.scala |  10 +-
 28 files changed, 1141 insertions(+), 1191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
index dcdf41f..9fecadb 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
@@ -21,6 +21,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.examples.util.{AllDictionaryUtil, ExampleUtils}
 
 object AllDictionaryExample {
+
   def main(args: Array[String]) {
     val cc = ExampleUtils.createCarbonContext("CarbonExample")
     val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -57,4 +58,5 @@ object AllDictionaryExample {
     // clean local dictionary files
     AllDictionaryUtil.cleanDictionary(allDictFile)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 038f609..f98d46d 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.examples.util.ExampleUtils
 
 object CarbonExample {
+
   def main(args: Array[String]) {
     val cc = ExampleUtils.createCarbonContext("CarbonExample")
     val testData = ExampleUtils.currentPath + "/src/main/resources/data.csv"
@@ -73,4 +74,5 @@ object CarbonExample {
     // Drop table
     cc.sql("DROP TABLE IF EXISTS t3")
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index f9960d3..587013f 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -140,7 +140,7 @@ object QueryTest {
              |$e
              |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
           """.stripMargin
-        return Some(errorMessage)
+        Some(errorMessage)
     }
 
     if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
@@ -157,9 +157,9 @@ object QueryTest {
               prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")
         }
       """.stripMargin
-      return Some(errorMessage)
+      Some(errorMessage)
     }
 
-    return None
+    None
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index a02751e..3596393 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -126,8 +126,8 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging {
       options.tableName,
       null,
       Seq(),
-      Map(("fileheader" -> header)),
-      false,
+      Map("fileheader" -> header),
+      isOverwriteExist = false,
       null,
       Some(dataFrame)).run(cc)
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 711c51c..3162f80 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -121,8 +121,8 @@ object CarbonFilters {
       expr match {
         case or@ Or(left, right) =>
 
-          val leftFilter = translate(left, true)
-          val rightFilter = translate(right, true)
+          val leftFilter = translate(left, or = true)
+          val rightFilter = translate(right, or = true)
           if (leftFilter.isDefined && rightFilter.isDefined) {
             Some( sources.Or(leftFilter.get, rightFilter.get))
           } else {
@@ -265,29 +265,27 @@ object CarbonFilters {
             Some(new EqualToExpression(transformExpression(child).get,
              transformExpression(Literal(null)).get, true))
         case Not(In(a: Attribute, list))
-         if !list.exists(!_.isInstanceOf[Literal]) =>
-         if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
-          Some(new FalseExpression(transformExpression(a).get))
-         }
-        else {
-          Some(new NotInExpression(transformExpression(a).get,
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get,
               new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
-            }
+          }
         case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new InExpression(transformExpression(a).get,
             new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
         case Not(In(Cast(a: Attribute, _), list))
           if !list.exists(!_.isInstanceOf[Literal]) =>
-        /* if any illogical expression comes in NOT IN Filter like
-         NOT IN('scala',NULL) this will be treated as false expression and will
-         always return no result. */
-          if (list.exists(x => (isNullLiteral(x.asInstanceOf[Literal])))) {
-          Some(new FalseExpression(transformExpression(a).get))
-         }
-        else {
-          Some(new NotInExpression(transformExpression(a).get, new ListExpression(
+          /* if any illogical expression comes in NOT IN Filter like
+           NOT IN('scala',NULL) this will be treated as false expression and will
+           always return no result. */
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get, new ListExpression(
               convertToJavaList(list.map(transformExpression(_).get)))))
-              }
+          }
         case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
           Some(new InExpression(transformExpression(a).get,
             new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 3ba32d2..3a5d952 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -76,7 +76,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
+    logInfo("Host Name: " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 856e67c..2a36f30 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -26,7 +26,8 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
@@ -78,11 +79,11 @@ class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
 }
 
 class SparkPartitionLoader(model: CarbonLoadModel,
-                           splitIndex: Int,
-                           storePath: String,
-                           kettleHomePath: String,
-                           loadCount: Int,
-                           loadMetadataDetails: LoadMetadataDetails) extends Logging{
+    splitIndex: Int,
+    storePath: String,
+    kettleHomePath: String,
+    loadCount: Int,
+    loadMetadataDetails: LoadMetadataDetails) extends Logging {
 
   var storeLocation: String = ""
 
@@ -106,7 +107,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     // container temp dir or is yarn application directory.
     val carbonUseLocalDir = CarbonProperties.getInstance()
       .getProperty("carbon.use.local.dir", "false")
-    if(carbonUseLocalDir.equalsIgnoreCase("true")) {
+    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
       val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
       if (null != storeLocations && storeLocations.nonEmpty) {
         storeLocation = storeLocations(Random.nextInt(storeLocations.length))
@@ -114,8 +115,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
       if (storeLocation == null) {
         storeLocation = System.getProperty("java.io.tmpdir")
       }
-    }
-    else {
+    } else {
       storeLocation = System.getProperty("java.io.tmpdir")
     }
     storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
@@ -127,7 +127,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
         kettleHomePath)
     } catch {
       case e: DataLoadingException => if (e.getErrorCode ==
-        DataProcessorConstants.BAD_REC_FOUND) {
+                                          DataProcessorConstants.BAD_REC_FOUND) {
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
         logInfo("Bad Record Found")
       } else {
@@ -160,6 +160,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     }
   }
 }
+
 /**
  * Use this RDD class to load csv data file
  *
@@ -171,7 +172,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
  * @param partitioner           Partitioner which specify how to partition
  * @param columinar             whether it is columinar
  * @param loadCount             Current load count
- * @param tableCreationTime      Time of creating table
+ * @param tableCreationTime     Time of creating table
  * @param schemaLastUpdatedTime Time of last schema update
  * @param blocksGroupBy         Blocks Array which is group by partition or host
  * @param isTableSplitPartition Whether using table split partition
@@ -195,30 +196,29 @@ class DataFileLoaderRDD[K, V](
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {
-    isTableSplitPartition match {
-      case true =>
-        // for table split partition
-        var splits = Array[TableSplit]()
-        if (carbonLoadModel.isDirectLoad) {
-          splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
-            partitioner.nodeList, partitioner.partitionCount)
-        }
-        else {
-          splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-            carbonLoadModel.getTableName, null, partitioner)
-        }
+    if (isTableSplitPartition) {
+      // for table split partition
+      var splits = Array[TableSplit]()
+      if (carbonLoadModel.isDirectLoad) {
+        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+          partitioner.nodeList, partitioner.partitionCount)
+      } else {
+        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+          carbonLoadModel.getTableName, null, partitioner)
+      }
 
-        splits.zipWithIndex.map {s =>
-          // filter the same partition unique id, because only one will match, so get 0 element
-          val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
-            p._1 == s._1.getPartition.getUniqueID)(0)._2
-          new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
-        }
-      case false =>
-        // for node partition
-        blocksGroupBy.zipWithIndex.map{b =>
-          new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
-        }
+      splits.zipWithIndex.map { case (split, index) =>
+        // filter the same partition unique id, because only one will match, so get 0 element
+        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
+          uniqueId == split.getPartition.getUniqueID
+        }(0)._2
+        new CarbonTableSplitPartition(id, index, split, blocksDetails)
+      }
+    } else {
+      // for node partition
+      blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
+        new CarbonNodePartition(id, index, uniqueId, blockDetails)
+      }
     }
   }
 
@@ -242,16 +242,14 @@ class DataFileLoaderRDD[K, V](
         setModelAndBlocksInfo()
         val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
           kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize
+        loader.initialize()
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         if (model.isRetentionRequest) {
           recreateAggregationTableForRetention
-        }
-        else if (model.isAggLoadRequest) {
+        } else if (model.isAggLoadRequest) {
           loadMetadataDetails.setLoadStatus(createManualAggregateTable)
-        }
-        else {
-          loader.run
+        } else {
+          loader.run()
         }
       } catch {
         case e: Exception =>
@@ -261,52 +259,50 @@ class DataFileLoaderRDD[K, V](
       }
 
       def setModelAndBlocksInfo(): Unit = {
-        isTableSplitPartition match {
-          case true =>
-            // for table split partition
-            val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
-            logInfo("Input split: " + split.serializableHadoopSplit.value)
-            val blocksID = gernerateBlocksID
-            carbonLoadModel.setBlocksID(blocksID)
-            carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-            if (carbonLoadModel.isDirectLoad) {
-              model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID,
-                split.serializableHadoopSplit.value.getPartition.getFilesPath,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-            } else {
-              model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID)
-            }
-            partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-            // get this partition data blocks and put it to global static map
-            GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
-            StandardLogService.setThreadName(partitionID, null)
-            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordPartitionBlockMap(
-              partitionID, split.partitionBlocksDetail.length)
-          case false =>
-            // for node partition
-            val split = theSplit.asInstanceOf[CarbonNodePartition]
-            logInfo("Input split: " + split.serializableHadoopSplit)
-            logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
-            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordHostBlockMap(
-              split.serializableHadoopSplit, split.nodeBlocksDetail.length)
-            val blocksID = gernerateBlocksID
-            carbonLoadModel.setBlocksID(blocksID)
-            carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-            // set this node blocks info to global static map
-            GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
-            if (carbonLoadModel.isDirectLoad) {
-              val filelist: java.util.List[String] = new java.util.ArrayList[String](
-                CarbonCommonConstants.CONSTANT_SIZE_TEN)
-              CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
-              model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-            }
-            else {
-              model = carbonLoadModel.getCopyWithPartition(partitionID)
-            }
-            StandardLogService.setThreadName(blocksID, null)
+        if (isTableSplitPartition) {
+          // for table split partition
+          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+          logInfo("Input split: " + split.serializableHadoopSplit.value)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setBlocksID(blocksID)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          if (carbonLoadModel.isDirectLoad) {
+            model = carbonLoadModel.getCopyWithPartition(
+              split.serializableHadoopSplit.value.getPartition.getUniqueID,
+              split.serializableHadoopSplit.value.getPartition.getFilesPath,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(
+              split.serializableHadoopSplit.value.getPartition.getUniqueID)
+          }
+          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+          // get this partition data blocks and put it to global static map
+          GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
+          StandardLogService.setThreadName(partitionID, null)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+            partitionID, split.partitionBlocksDetail.length)
+        } else {
+          // for node partition
+          val split = theSplit.asInstanceOf[CarbonNodePartition]
+          logInfo("Input split: " + split.serializableHadoopSplit)
+          logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+            split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setBlocksID(blocksID)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          // set this node blocks info to global static map
+          GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
+          if (carbonLoadModel.isDirectLoad) {
+            val filelist: java.util.List[String] = new java.util.ArrayList[String](
+              CarbonCommonConstants.CONSTANT_SIZE_TEN)
+            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(partitionID)
+          }
+          StandardLogService.setThreadName(blocksID, null)
         }
       }
 
@@ -316,14 +312,13 @@ class DataFileLoaderRDD[K, V](
        * @return
        */
       def gernerateBlocksID: String = {
-        isTableSplitPartition match {
-          case true =>
-            carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-            theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
-              .getPartition.getUniqueID + "_" + UUID.randomUUID()
-          case false =>
-            carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-            UUID.randomUUID()
+        if (isTableSplitPartition) {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+            .getPartition.getUniqueID + "_" + UUID.randomUUID()
+        } else {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          UUID.randomUUID()
         }
       }
 
@@ -351,8 +346,7 @@ class DataFileLoaderRDD[K, V](
             CarbonLoaderUtil
               .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
             logInfo(s"Aggregate table creation failed")
-          }
-          else {
+          } else {
             logInfo("Aggregate tables creation successfull")
           }
         }
@@ -425,6 +419,7 @@ class DataFileLoaderRDD[K, V](
       }
 
       var finished = false
+
       override def hasNext: Boolean = {
         !finished
       }
@@ -438,46 +433,46 @@ class DataFileLoaderRDD[K, V](
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    isTableSplitPartition match {
-      case true =>
-        // for table split partition
-        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
-        val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
-        location
-      case false =>
-        // for node partition
-        val theSplit = split.asInstanceOf[CarbonNodePartition]
-        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
-        logInfo("Preferred Location for split : " + firstOptionLocation(0))
-        val blockMap = new util.LinkedHashMap[String, Integer]()
-        val tableBlocks = theSplit.blocksDetails
-        tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
-          location => {
-            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
-              val currentCount = blockMap.get(location)
-              if (currentCount == null) {
-                blockMap.put(location, 1)
-              } else {
-                blockMap.put(location, currentCount + 1)
-              }
+    if (isTableSplitPartition) {
+      // for table split partition
+      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+      val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+      location
+    } else {
+      // for node partition
+      val theSplit = split.asInstanceOf[CarbonNodePartition]
+      val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+      logInfo("Preferred Location for split: " + firstOptionLocation.head)
+      val blockMap = new util.LinkedHashMap[String, Integer]()
+      val tableBlocks = theSplit.blocksDetails
+      tableBlocks.foreach { tableBlock =>
+        tableBlock.getLocations.foreach { location =>
+          if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
+            val currentCount = blockMap.get(location)
+            if (currentCount == null) {
+              blockMap.put(location, 1)
+            } else {
+              blockMap.put(location, currentCount + 1)
             }
           }
-        )
-        )
-
-        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
-          nodeCount1.getValue > nodeCount2.getValue
         }
-        )
+      }
+
+      val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+        nodeCount1.getValue > nodeCount2.getValue
+      }
+      )
 
-        val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-        firstOptionLocation ++ sortedNodesList
+      val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+      firstOptionLocation ++ sortedNodesList
     }
   }
+
 }
 
 /**
  * Use this RDD class to load RDD
+ *
  * @param sc
  * @param result
  * @param carbonLoadModel
@@ -512,7 +507,7 @@ class DataFrameLoaderRDD[K, V](
       var partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
-        theSplit.index
+                               theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
@@ -521,14 +516,14 @@ class DataFrameLoaderRDD[K, V](
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
           kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize
+        loader.initialize()
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         val rddIteratorKey = UUID.randomUUID().toString
-        try{
+        try {
           RddInputUtils.put(rddIteratorKey,
             new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
           carbonLoadModel.setRddIteratorKey(rddIteratorKey)
-          loader.run
+          loader.run()
         } finally {
           RddInputUtils.remove(rddIteratorKey)
         }
@@ -540,6 +535,7 @@ class DataFrameLoaderRDD[K, V](
       }
 
       var finished = false
+
       override def hasNext: Boolean = !finished
 
       override def next(): (K, V) = {
@@ -556,11 +552,12 @@ class DataFrameLoaderRDD[K, V](
 /**
  * This class wrap Scala's Iterator to Java's Iterator.
  * It also convert all columns to string data to use csv data loading flow.
+ *
  * @param rddIter
  * @param carbonLoadModel
  */
 class RddIterator(rddIter: Iterator[Row],
-                  carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
+    carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
   val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
   val format = new SimpleDateFormat(formatString)
@@ -570,9 +567,10 @@ class RddIterator(rddIter: Iterator[Row],
   def hasNext: Boolean = rddIter.hasNext
 
   private def getString(value: Any, level: Int = 1): String = {
-    value == null match {
-      case true => ""
-      case false => value match {
+    if (value == null) {
+      ""
+    } else {
+      value match {
         case s: String => s
         case i: java.lang.Integer => i.toString
         case d: java.lang.Double => d.toString
@@ -623,4 +621,5 @@ class RddIterator(rddIter: Iterator[Row],
 
   def remove(): Unit = {
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4392efe..1382efa 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -32,7 +32,8 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{util => _, _}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
+CompactionModel, Partitioner}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
@@ -44,7 +45,8 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
+import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable,
+CompactionType}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
@@ -56,6 +58,7 @@ import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
 
+
 /**
  * This is the factory class which can create different RDD depends on user needs.
  *
@@ -178,8 +181,12 @@ object CarbonDataRDDFactory extends Logging {
   }
 
   def configSplitMaxSize(context: SparkContext, filePaths: String,
-    hadoopConfiguration: Configuration): Unit = {
-    val defaultParallelism = if (context.defaultParallelism < 1) 1 else context.defaultParallelism
+      hadoopConfiguration: Configuration): Unit = {
+    val defaultParallelism = if (context.defaultParallelism < 1) {
+      1
+    } else {
+      context.defaultParallelism
+    }
     val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
     val blockSize =
       hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
@@ -191,30 +198,26 @@ object CarbonDataRDDFactory extends Logging {
         newSplitSize = CarbonCommonConstants.CARBON_16MB
       }
       hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
-      logInfo("totalInputSpaceConsumed : " + spaceConsumed +
-        " , defaultParallelism : " + defaultParallelism)
-      logInfo("mapreduce.input.fileinputformat.split.maxsize : " + newSplitSize.toString)
+      logInfo(s"totalInputSpaceConsumed: $spaceConsumed , defaultParallelism: $defaultParallelism")
+      logInfo(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
     }
   }
 
   def alterTableForCompaction(sqlContext: SQLContext,
-    alterTableModel: AlterTableModel,
-    carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
-    kettleHomePath: String, storeLocation: String): Unit = {
+      alterTableModel: AlterTableModel,
+      carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String,
+      kettleHomePath: String, storeLocation: String): Unit = {
     var compactionSize: Long = 0
     var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
     if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
       compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
       compactionType = CompactionType.MAJOR_COMPACTION
-    }
-    else {
+    } else {
       compactionType = CompactionType.MINOR_COMPACTION
     }
 
-    logger
-      .audit(s"Compaction request received for table " +
-        s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
-      )
+    logger.audit(s"Compaction request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
       .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
@@ -244,9 +247,7 @@ object CarbonDataRDDFactory extends Logging {
     // if any other request comes at this time then it will create a compaction request file.
     // so that this will be taken up by the compaction process which is executing.
     if (!isConcurrentCompactionAllowed) {
-      logger
-        .info("System level compaction lock is enabled."
-        )
+      logger.info("System level compaction lock is enabled.")
       handleCompactionForSystemLocking(sqlContext,
         carbonLoadModel,
         partitioner,
@@ -257,8 +258,7 @@ object CarbonDataRDDFactory extends Logging {
         carbonTable,
         compactionModel
       )
-    }
-    else {
+    } else {
       // normal flow of compaction
       val lock = CarbonLockFactory
         .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
@@ -266,10 +266,8 @@ object CarbonDataRDDFactory extends Logging {
         )
 
       if (lock.lockWithRetries()) {
-        logger
-          .info("Acquired the compaction lock for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+        logger.info("Acquired the compaction lock for table" +
+                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
@@ -280,45 +278,37 @@ object CarbonDataRDDFactory extends Logging {
             compactionModel,
             lock
           )
-        }
-        catch {
-          case e : Exception =>
-            logger.error("Exception in start compaction thread. " + e.getMessage)
+        } catch {
+          case e: Exception =>
+            logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
             lock.unlock()
         }
-      }
-      else {
-        logger
-          .audit("Not able to acquire the compaction lock for table " +
-            s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
-          )
-        logger
-          .error("Not able to acquire the compaction lock for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+      } else {
+        logger.audit("Not able to acquire the compaction lock for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        logger.error(s"Not able to acquire the compaction lock for table" +
+                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         sys.error("Table is already locked for compaction. Please try after some time.")
       }
     }
   }
 
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
-    carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
-    storePath: String,
-    kettleHomePath: String,
-    storeLocation: String,
-    compactionType: CompactionType,
-    carbonTable: CarbonTable,
-    compactionModel: CompactionModel): Unit = {
+      carbonLoadModel: CarbonLoadModel,
+      partitioner: Partitioner,
+      storePath: String,
+      kettleHomePath: String,
+      storeLocation: String,
+      compactionType: CompactionType,
+      carbonTable: CarbonTable,
+      compactionModel: CompactionModel): Unit = {
     val lock = CarbonLockFactory
       .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
         LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
       )
     if (lock.lockWithRetries()) {
-      logger
-        .info("Acquired the compaction lock for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
+      logger.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
+                  s".${ carbonLoadModel.getTableName }")
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
@@ -329,50 +319,43 @@ object CarbonDataRDDFactory extends Logging {
           compactionModel,
           lock
         )
-      }
-      catch {
-        case e : Exception =>
-          logger.error("Exception in start compaction thread. " + e.getMessage)
+      } catch {
+        case e: Exception =>
+          logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
           lock.unlock()
           // if the compaction is a blocking call then only need to throw the exception.
           if (compactionModel.isDDLTrigger) {
             throw e
           }
       }
-    }
-    else {
-      logger
-        .audit("Not able to acquire the system level compaction lock for table " +
-          s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}"
-        )
-      logger
-        .error("Not able to acquire the compaction lock for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
+    } else {
+      logger.audit("Not able to acquire the system level compaction lock for table " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      logger.error("Not able to acquire the compaction lock for table " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       CarbonCompactionUtil
         .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
       // do sys error only in case of DDL trigger.
-      if(compactionModel.isDDLTrigger) {
-        sys.error("Compaction is in progress, compaction request for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue.")
-      }
-      else {
-        logger
-          .error("Compaction is in progress, compaction request for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName + " is in queue."
-          )
+      if (compactionModel.isDDLTrigger) {
+        sys.error("Compaction is in progress, compaction request for table " +
+                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+                  " is in queue.")
+      } else {
+        logger.error("Compaction is in progress, compaction request for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+                     " is in queue.")
       }
     }
   }
 
   def executeCompaction(carbonLoadModel: CarbonLoadModel,
-    storePath: String,
-    compactionModel: CompactionModel,
-    partitioner: Partitioner,
-    executor: ExecutorService,
-    sqlContext: SQLContext,
-    kettleHomePath: String,
-    storeLocation: String): Unit = {
+      storePath: String,
+      compactionModel: CompactionModel,
+      partitioner: Partitioner,
+      executor: ExecutorService,
+      sqlContext: SQLContext,
+      kettleHomePath: String,
+      storeLocation: String): Unit = {
     val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
       carbonLoadModel.getLoadMetadataDetails
     )
@@ -413,10 +396,9 @@ object CarbonDataRDDFactory extends Logging {
           future.get
         }
         )
-      }
-      catch {
+      } catch {
         case e: Exception =>
-          logger.error("Exception in compaction thread " + e.getMessage)
+          logger.error(s"Exception in compaction thread ${ e.getMessage }")
           throw e
       }
 
@@ -442,22 +424,23 @@ object CarbonDataRDDFactory extends Logging {
       )
     }
   }
+
   /**
    * This will submit the loads to be merged into the executor.
    *
    * @param futureList
    */
   def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
-    loadsToMerge: util
-    .List[LoadMetadataDetails],
-    executor: ExecutorService,
-    storePath: String,
-    sqlContext: SQLContext,
-    compactionModel: CompactionModel,
-    kettleHomePath: String,
-    carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
-    storeLocation: String): Unit = {
+      loadsToMerge: util
+      .List[LoadMetadataDetails],
+      executor: ExecutorService,
+      storePath: String,
+      sqlContext: SQLContext,
+      compactionModel: CompactionModel,
+      kettleHomePath: String,
+      carbonLoadModel: CarbonLoadModel,
+      partitioner: Partitioner,
+      storeLocation: String): Unit = {
 
     loadsToMerge.asScala.foreach(seg => {
       logger.info("loads identified for merge is " + seg.getLoadName)
@@ -484,13 +467,13 @@ object CarbonDataRDDFactory extends Logging {
   }
 
   def startCompactionThreads(sqlContext: SQLContext,
-    carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
-    storePath: String,
-    kettleHomePath: String,
-    storeLocation: String,
-    compactionModel: CompactionModel,
-    compactionLock: ICarbonLock): Unit = {
+      carbonLoadModel: CarbonLoadModel,
+      partitioner: Partitioner,
+      storePath: String,
+      kettleHomePath: String,
+      storeLocation: String,
+      compactionModel: CompactionModel,
+      compactionLock: ICarbonLock): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
     readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -499,138 +482,123 @@ object CarbonDataRDDFactory extends Logging {
     // clean up of the stale segments.
     try {
       CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    }
-    catch {
+    } catch {
       case e: Exception =>
-        logger
-          .error("Exception in compaction thread while clean up of stale segments " + e
-            .getMessage
-          )
+        logger.error(s"Exception in compaction thread while clean up of stale segments" +
+                     s" ${ e.getMessage }")
     }
 
-      val compactionThread = new Thread {
-        override def run(): Unit = {
+    val compactionThread = new Thread {
+      override def run(): Unit = {
 
+        try {
+          // compaction status of the table which is triggered by the user.
+          var triggeredCompactionStatus = false
+          var exception: Exception = null
           try {
-            // compaction status of the table which is triggered by the user.
-            var triggeredCompactionStatus = false
-            var exception : Exception = null
-            try {
-              executeCompaction(carbonLoadModel: CarbonLoadModel,
-                storePath: String,
-                compactionModel: CompactionModel,
-                partitioner: Partitioner,
-                executor, sqlContext, kettleHomePath, storeLocation
+            executeCompaction(carbonLoadModel: CarbonLoadModel,
+              storePath: String,
+              compactionModel: CompactionModel,
+              partitioner: Partitioner,
+              executor, sqlContext, kettleHomePath, storeLocation
+            )
+            triggeredCompactionStatus = true
+          } catch {
+            case e: Exception =>
+              logger.error(s"Exception in compaction thread ${ e.getMessage }")
+              exception = e
+          }
+          // continue in case of exception also, check for all the tables.
+          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+            ).equalsIgnoreCase("true")
+
+          if (!isConcurrentCompactionAllowed) {
+            logger.info("System level compaction lock is enabled.")
+            val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
+            var tableForCompaction = CarbonCompactionUtil
+              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+                .tablesMeta.toArray, skipCompactionTables.toList.asJava
               )
-              triggeredCompactionStatus = true
-            }
-            catch {
-              case e: Exception =>
-                logger.error("Exception in compaction thread " + e.getMessage)
-                exception = e
-            }
-            // continue in case of exception also, check for all the tables.
-            val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-              ).equalsIgnoreCase("true")
-
-            if (!isConcurrentCompactionAllowed) {
-              logger.info("System level compaction lock is enabled.")
-              val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
-              var tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                  .tablesMeta.toArray, skipCompactionTables.toList.asJava
+            while (null != tableForCompaction) {
+              logger.info("Compaction request has been identified for table " +
+                          s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                          s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+              val table: CarbonTable = tableForCompaction.carbonTable
+              val metadataPath = table.getMetaDataFilepath
+              val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
+
+              val newCarbonLoadModel = new CarbonLoadModel()
+              prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+              val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                  newCarbonLoadModel.getTableName
                 )
-              while (null != tableForCompaction) {
-                logger
-                  .info("Compaction request has been identified for table " + tableForCompaction
-                    .carbonTable.getDatabaseName + "." + tableForCompaction.carbonTableIdentifier
-                          .getTableName
-                  )
-                val table: CarbonTable = tableForCompaction.carbonTable
-                val metadataPath = table.getMetaDataFilepath
-                val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
-
-                val newCarbonLoadModel = new CarbonLoadModel()
-                prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
-                val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
-                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                    newCarbonLoadModel.getTableName
-                  )
-
-                val compactionSize = CarbonDataMergerUtil
-                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
-
-                val newcompactionModel = CompactionModel(compactionSize,
-                  compactionType,
-                  table,
-                  tableCreationTime,
-                  compactionModel.isDDLTrigger
+
+              val compactionSize = CarbonDataMergerUtil
+                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+
+              val newcompactionModel = CompactionModel(compactionSize,
+                compactionType,
+                table,
+                tableCreationTime,
+                compactionModel.isDDLTrigger
+              )
+              // proceed for compaction
+              try {
+                executeCompaction(newCarbonLoadModel,
+                  newCarbonLoadModel.getStorePath,
+                  newcompactionModel,
+                  partitioner,
+                  executor, sqlContext, kettleHomePath, storeLocation
                 )
-                // proceed for compaction
-                try {
-                  executeCompaction(newCarbonLoadModel,
-                    newCarbonLoadModel.getStorePath,
-                    newcompactionModel,
-                    partitioner,
-                    executor, sqlContext, kettleHomePath, storeLocation
-                  )
+              } catch {
+                case e: Exception =>
+                  logger.error("Exception in compaction thread for table " +
+                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                // not handling the exception. only logging as this is not the table triggered
+                // by user.
+              } finally {
+                // delete the compaction required file in case of failure or success also.
+                if (!CarbonCompactionUtil
+                  .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                  // if the compaction request file is not been able to delete then
+                  // add those tables details to the skip list so that it wont be considered next.
+                  skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
+                  logger.error("Compaction request file can not be deleted for table " +
+                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 }
-                catch {
-                  case e: Exception =>
-                    logger.error("Exception in compaction thread for table " + tableForCompaction
-                      .carbonTable.getDatabaseName + "." +
-                                 tableForCompaction.carbonTableIdentifier
-                                   .getTableName)
-                  // not handling the exception. only logging as this is not the table triggered
-                  // by user.
-                }
-                finally {
-                  // delete the compaction required file in case of failure or success also.
-                  if (!CarbonCompactionUtil
-                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
-                    // if the compaction request file is not been able to delete then
-                    // add those tables details to the skip list so that it wont be considered next.
-                    skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
-                    logger
-                      .error("Compaction request file can not be deleted for table " +
-                             tableForCompaction
-                               .carbonTable.getDatabaseName + "." + tableForCompaction
-                               .carbonTableIdentifier
-                               .getTableName
-                      )
-
-                  }
-                }
-                // ********* check again for all the tables.
-                tableForCompaction = CarbonCompactionUtil
-                  .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                    .tablesMeta.toArray, skipCompactionTables.asJava
-                  )
-              }
-              // giving the user his error for telling in the beeline if his triggered table
-              // compaction is failed.
-              if (!triggeredCompactionStatus) {
-                throw new Exception("Exception in compaction " + exception.getMessage)
               }
+              // ********* check again for all the tables.
+              tableForCompaction = CarbonCompactionUtil
+                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+                  .tablesMeta.toArray, skipCompactionTables.asJava
+                )
+            }
+            // giving the user his error for telling in the beeline if his triggered table
+            // compaction is failed.
+            if (!triggeredCompactionStatus) {
+              throw new Exception("Exception in compaction " + exception.getMessage)
             }
           }
-          finally {
-            executor.shutdownNow()
-            deletePartialLoadsInCompaction(carbonLoadModel)
-            compactionLock.unlock()
-          }
+        } finally {
+          executor.shutdownNow()
+          deletePartialLoadsInCompaction(carbonLoadModel)
+          compactionLock.unlock()
         }
       }
+    }
     // calling the run method of a thread to make the call as blocking call.
     // in the future we may make this as concurrent.
     compactionThread.run()
   }
 
   def prepareCarbonLoadModel(storePath: String,
-    table: CarbonTable,
-    newCarbonLoadModel: CarbonLoadModel): Unit = {
+      table: CarbonTable,
+      newCarbonLoadModel: CarbonLoadModel): Unit = {
     newCarbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
     newCarbonLoadModel.setTableName(table.getFactTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -651,13 +619,10 @@ object CarbonDataRDDFactory extends Logging {
     // so deleting those folders.
     try {
       CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    }
-    catch {
+    } catch {
       case e: Exception =>
-        logger
-          .error("Exception in compaction thread while clean up of stale segments " + e
-            .getMessage
-          )
+        logger.error(s"Exception in compaction thread while clean up of stale segments" +
+                     s" ${ e.getMessage }")
     }
   }
 
@@ -674,13 +639,11 @@ object CarbonDataRDDFactory extends Logging {
     val isAgg = false
     // for handling of the segment Merging.
     def handleSegmentMerging(tableCreationTime: Long): Unit = {
-      logger
-        .info("compaction need status is " + CarbonDataMergerUtil.checkIfAutoLoadMergingRequired())
+      logger.info(s"compaction need status is" +
+                  s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
-        logger
-          .audit("Compaction request received for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+        logger.audit(s"Compaction request received for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         val compactionSize = 0
         val isCompactionTriggerByDDl = false
         val compactionModel = CompactionModel(compactionSize,
@@ -717,8 +680,7 @@ object CarbonDataRDDFactory extends Logging {
             carbonTable,
             compactionModel
           )
-        }
-        else {
+        } else {
           val lock = CarbonLockFactory
             .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
               LockUsage.COMPACTION_LOCK
@@ -736,37 +698,34 @@ object CarbonDataRDDFactory extends Logging {
                 compactionModel,
                 lock
               )
-            }
-            catch {
-              case e : Exception =>
-                logger.error("Exception in start compaction thread. " + e.getMessage)
+            } catch {
+              case e: Exception =>
+                logger.error(s"Exception in start compaction thread. ${ e.getMessage }")
                 lock.unlock()
                 throw e
             }
-          }
-          else {
-            logger
-              .audit("Not able to acquire the compaction lock for table " + carbonLoadModel
-                .getDatabaseName + "." + carbonLoadModel.getTableName
-              )
-            logger
-              .error("Not able to acquire the compaction lock for table " + carbonLoadModel
-                .getDatabaseName + "." + carbonLoadModel.getTableName
-              )
+          } else {
+            logger.audit("Not able to acquire the compaction lock for table " +
+                         s"${ carbonLoadModel.getDatabaseName }.${
+                           carbonLoadModel
+                             .getTableName
+                         }")
+            logger.error("Not able to acquire the compaction lock for table " +
+                         s"${ carbonLoadModel.getDatabaseName }.${
+                           carbonLoadModel
+                             .getTableName
+                         }")
           }
         }
       }
     }
 
     try {
-      logger
-        .audit("Data load request has been received for table " + carbonLoadModel
-          .getDatabaseName + "." + carbonLoadModel.getTableName
-        )
+      logger.audit(s"Data load request has been received for table" +
+                   s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       if (!useKettle) {
-        logger.audit("Data is loading with New Data Flow for table " + carbonLoadModel
-            .getDatabaseName + "." + carbonLoadModel.getTableName
-          )
+        logger.audit("Data is loading with New Data Flow for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
       deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
@@ -801,13 +760,10 @@ object CarbonDataRDDFactory extends Logging {
       // so deleting those folders.
       try {
         CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      }
-      catch {
+      } catch {
         case e: Exception =>
           logger
-            .error("Exception in data load while clean up of stale segments " + e
-              .getMessage
-            )
+            .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
       }
 
       // reading the start time of data load.
@@ -826,14 +782,14 @@ object CarbonDataRDDFactory extends Logging {
       var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
       var status: Array[(String, LoadMetadataDetails)] = null
 
-      def loadDataFile(): Unit = { isTableSplitPartition match {
-        case true =>
+      def loadDataFile(): Unit = {
+        if (isTableSplitPartition) {
           /*
-           * when data handle by table split partition
-           * 1) get partition files, direct load or not will get the different files path
-           * 2) get files blocks by using SplitUtils
-           * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
-           */
+         * when data handle by table split partition
+         * 1) get partition files, direct load or not will get the different files path
+         * 2) get files blocks by using SplitUtils
+         * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
+         */
           var splits = Array[TableSplit]()
           if (carbonLoadModel.isDirectLoad) {
             // get all table Splits, this part means files were divide to different partitions
@@ -865,7 +821,7 @@ object CarbonDataRDDFactory extends Logging {
                 val pathBuilder = new StringBuilder()
                 pathBuilder.append(carbonLoadModel.getFactFilePath)
                 if (!carbonLoadModel.getFactFilePath.endsWith("/")
-                  && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
+                    && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
                   pathBuilder.append("/")
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
@@ -873,16 +829,15 @@ object CarbonDataRDDFactory extends Logging {
                   SplitUtils.getSplits(pathBuilder.toString, sqlContext.sparkContext))
             }
           }
-
-        case false =>
+        } else {
           /*
-           * when data load handle by node partition
-           * 1)clone the hadoop configuration,and set the file path to the configuration
-           * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
-           * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
-           * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
-           *   which parititon by host
-           */
+         * when data load handle by node partition
+         * 1)clone the hadoop configuration,and set the file path to the configuration
+         * 2)use NewHadoopRDD to get split,size:Math.max(minSize, Math.min(maxSize, blockSize))
+         * 3)use DummyLoadRDD to group blocks by host,and let spark balance the block location
+         * 4)DummyLoadRDD output (host,Array[BlockDetails])as the parameter to CarbonDataLoadRDD
+         *   which parititon by host
+         */
           val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
           // FileUtils will skip file which is no csv, and return all file path which split by ','
           val filePaths = carbonLoadModel.getFactFilePath
@@ -921,9 +876,11 @@ object CarbonDataRDDFactory extends Logging {
               .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
               .toSeq
           val timeElapsed: Long = System.currentTimeMillis - startTime
-          logInfo("Total Time taken in block allocation : " + timeElapsed)
-          logInfo("Total no of blocks : " + blockList.size
-            + ", No.of Nodes : " + nodeBlockMapping.size
+          logInfo("Total Time taken in block allocation: " + timeElapsed)
+          logInfo(s"Total no of blocks: ${ blockList.length }, No.of Nodes: ${
+            nodeBlockMapping
+              .size
+          }"
           )
           var str = ""
           nodeBlockMapping.foreach(entry => {
@@ -983,7 +940,7 @@ object CarbonDataRDDFactory extends Logging {
         var rdd = dataFrame.get.rdd
         var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
         numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
-        rdd = rdd.coalesce(numPartitions, false)
+        rdd = rdd.coalesce(numPartitions, shuffle = false)
 
         status = new DataFrameLoaderRDD(sqlContext.sparkContext,
           new DataLoadResultImpl(),
@@ -1061,37 +1018,34 @@ object CarbonDataRDDFactory extends Logging {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         logInfo("********clean up done**********")
         logger.audit(s"Data load is failed for " +
-          s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         logWarning("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
-          val metadataDetails = status(0)._2
-          if (!isAgg) {
-            val status = CarbonLoaderUtil
-              .recordLoadMetadata(currentLoadCount,
-                metadataDetails,
-                carbonLoadModel,
-                loadStatus,
-                loadStartTime
-              )
-            if (!status) {
-              val errorMessage = "Dataload failed due to failure in table status updation."
-              logger.audit("Data load is failed for " +
-                           s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-              logger.error("Dataload failed due to failure in table status updation.")
-              throw new Exception(errorMessage)
-            }
-          } else if (!carbonLoadModel.isRetentionRequest) {
-            // TODO : Handle it
-            logInfo("********Database updated**********")
+        val metadataDetails = status(0)._2
+        if (!isAgg) {
+          val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
+            carbonLoadModel, loadStatus, loadStartTime)
+          if (!status) {
+            val errorMessage = "Dataload failed due to failure in table status updation."
+            logger.audit("Data load is failed for " +
+                         s"${ carbonLoadModel.getDatabaseName }.${
+                           carbonLoadModel
+                             .getTableName
+                         }")
+            logger.error("Dataload failed due to failure in table status updation.")
+            throw new Exception(errorMessage)
           }
-          logger.audit("Data load is successful for " +
-                       s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
+        } else if (!carbonLoadModel.isRetentionRequest) {
+          // TODO : Handle it
+          logInfo("********Database updated**********")
+        }
+        logger.audit("Data load is successful for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)
-        }
-        catch {
+        } catch {
           case e: Exception =>
             throw new Exception(
               "Dataload is success. Auto-Compaction has failed. Please check logs.")
@@ -1111,10 +1065,10 @@ object CarbonDataRDDFactory extends Logging {
   }
 
   def deleteLoadsAndUpdateMetadata(
-    carbonLoadModel: CarbonLoadModel,
-    table: CarbonTable, partitioner: Partitioner,
-    storePath: String,
-    isForceDeletion: Boolean) {
+      carbonLoadModel: CarbonLoadModel,
+      table: CarbonTable, partitioner: Partitioner,
+      storePath: String,
+      isForceDeletion: Boolean) {
     if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
       val loadMetadataFilePath = CarbonLoaderUtil
         .extractLoadMetadataFileLocation(carbonLoadModel)
@@ -1132,36 +1086,34 @@ object CarbonDataRDDFactory extends Logging {
 
       if (isUpdationRequired) {
         try {
-        // Update load metadate file after cleaning deleted nodes
-        if (carbonTableStatusLock.lockWithRetries()) {
-          logger.info("Table status lock has been successfully acquired.")
+          // Update load metadate file after cleaning deleted nodes
+          if (carbonTableStatusLock.lockWithRetries()) {
+            logger.info("Table status lock has been successfully acquired.")
 
-          // read latest table status again.
-          val latestMetadata = segmentStatusManager
-            .readLoadMetadata(loadMetadataFilePath)
+            // read latest table status again.
+            val latestMetadata = segmentStatusManager.readLoadMetadata(loadMetadataFilePath)
 
-          // update the metadata details from old to new status.
+            // update the metadata details from old to new status.
+            val latestStatus = CarbonLoaderUtil
+              .updateLoadMetadataFromOldToNew(details, latestMetadata)
 
-          val latestStatus = CarbonLoaderUtil
-            .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
-          CarbonLoaderUtil.writeLoadMetadata(
-            carbonLoadModel.getCarbonDataLoadSchema,
-            carbonLoadModel.getDatabaseName,
-            carbonLoadModel.getTableName, latestStatus
-          )
-        }
-        else {
-          val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
-                         "." + carbonLoadModel.getTableName +
-                         ". Not able to acquire the table status lock due to other operation " +
-                         "running in the background."
-          logger.audit(errorMsg)
-          logger.error(errorMsg)
-          throw new Exception(errorMsg + " Please try after some time.")
+            CarbonLoaderUtil.writeLoadMetadata(
+              carbonLoadModel.getCarbonDataLoadSchema,
+              carbonLoadModel.getDatabaseName,
+              carbonLoadModel.getTableName, latestStatus
+            )
+          } else {
+            val errorMsg = "Clean files request is failed for " +
+                           s"${ carbonLoadModel.getDatabaseName }." +
+                           s"${ carbonLoadModel.getTableName }" +
+                           ". Not able to acquire the table status lock due to other operation " +
+                           "running in the background."
+            logger.audit(errorMsg)
+            logger.error(errorMsg)
+            throw new Exception(errorMsg + " Please try after some time.")
 
-        }
-      } finally {
+          }
+        } finally {
           CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
         }
       }
@@ -1197,10 +1149,9 @@ object CarbonDataRDDFactory extends Logging {
           partitioner,
           storePath,
           isForceDeletion = true)
-      }
-      else {
-        val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName +
-                       "." + carbonLoadModel.getTableName +
+      } else {
+        val errorMsg = "Clean files request is failed for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
                        ". Not able to acquire the clean files lock due to another clean files " +
                        "operation is running in the background."
         logger.audit(errorMsg)
@@ -1208,10 +1159,10 @@ object CarbonDataRDDFactory extends Logging {
         throw new Exception(errorMsg + " Please try after some time.")
 
       }
-    }
-    finally {
+    } finally {
       CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
     }
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 8c52249..17b487c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -86,7 +86,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
+    logInfo("Host Name: " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index df40ed7..57bf124 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -77,7 +77,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonLoadPartition]
     val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name : " + s.head + s.length)
+    logInfo("Host Name: " + s.head + s.length)
     s
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index b7da579..bce4eb2 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifie
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
@@ -67,6 +67,7 @@ trait GenericParser {
 
 case class DictionaryStats(distinctValues: java.util.List[String],
     dictWriteTime: Long, sortIndexWriteTime: Long)
+
 case class PrimitiveParser(dimension: CarbonDimension,
     setOpt: Option[HashSet[String]]) extends GenericParser {
   val (hasDictEncoding, set: HashSet[String]) = setOpt match {
@@ -164,20 +165,21 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
  * A RDD to combine all dictionary distinct values.
  *
  * @constructor create a RDD with RDD[(String, Iterable[String])]
- * @param prev the input RDD[(String, Iterable[String])]
+ * @param prev  the input RDD[(String, Iterable[String])]
  * @param model a model package load info
  */
 class CarbonAllDictionaryCombineRDD(
-                                       prev: RDD[(String, Iterable[String])],
-                                       model: DictionaryLoadModel)
+    prev: RDD[(String, Iterable[String])],
+    model: DictionaryLoadModel)
   extends RDD[(Int, ColumnDistinctValues)](prev) with Logging {
 
-  override def getPartitions: Array[Partition] =
+  override def getPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
+  }
 
   override def compute(split: Partition, context: TaskContext
-                      ): Iterator[(Int, ColumnDistinctValues)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass().getName())
+  ): Iterator[(Int, ColumnDistinctValues)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
     val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
     /*
@@ -240,7 +242,7 @@ class CarbonBlockDistinctValuesCombineRDD(
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
     val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
     var rowCount = 0L
     try {
@@ -259,7 +261,7 @@ class CarbonBlockDistinctValuesCombineRDD(
           }
         }
       }
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
     } catch {
       case ex: Exception =>
         LOGGER.error(ex)
@@ -288,7 +290,7 @@ class CarbonGlobalDictionaryGenerateRDD(
   override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass().getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     var isHighCardinalityColumn = false
     val iter = new Iterator[(Int, String, Boolean)] {
@@ -303,11 +305,11 @@ class CarbonGlobalDictionaryGenerateRDD(
            model.hdfsTempLocation)
       }
       if (StringUtils.isNotBlank(model.lockType)) {
-         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
-           model.lockType)
+        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
+          model.lockType)
       }
       if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
-         CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
+        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
           model.zooKeeperUrl)
       }
       val dictLock = CarbonLockFactory
@@ -320,7 +322,7 @@ class CarbonGlobalDictionaryGenerateRDD(
         val valuesBuffer = new mutable.HashSet[String]
         val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
         var rowCount = 0L
-        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
         breakable {
           while (rddIter.hasNext) {
             val distinctValueList = rddIter.next()._2
@@ -329,7 +331,7 @@ class CarbonGlobalDictionaryGenerateRDD(
             // check high cardinality
             if (model.isFirstLoad && model.highCardIdentifyEnable
                 && !model.isComplexes(split.index)
-                && model.dimensions(split.index).isColumnar()) {
+                && model.dimensions(split.index).isColumnar) {
               isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
                 valuesBuffer.size, rowCount, model)
               if (isHighCardinalityColumn) {
@@ -338,10 +340,13 @@ class CarbonGlobalDictionaryGenerateRDD(
             }
           }
         }
-        val combineListTime = (System.currentTimeMillis() - t1)
+        val combineListTime = System.currentTimeMillis() - t1
         if (isHighCardinalityColumn) {
-          LOGGER.info("column " + model.table.getTableUniqueName + "." +
-                      model.primDimensions(split.index).getColName + " is high cardinality column")
+          LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
+                      s"${
+                        model.primDimensions(split.index)
+                          .getColName
+                      } is high cardinality column")
         } else {
           isDictionaryLocked = dictLock.lockWithRetries()
           if (isDictionaryLocked) {
@@ -367,7 +372,7 @@ class CarbonGlobalDictionaryGenerateRDD(
           } else {
             null
           }
-          val dictCacheTime = (System.currentTimeMillis - t2)
+          val dictCacheTime = System.currentTimeMillis - t2
           val t3 = System.currentTimeMillis()
           val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
             dictionaryForDistinctValueLookUp,
@@ -375,7 +380,7 @@ class CarbonGlobalDictionaryGenerateRDD(
             split.index)
           // execute dictionary writer task to get distinct values
           val distinctValues = dictWriteTask.execute()
-          val dictWriteTime = (System.currentTimeMillis() - t3)
+          val dictWriteTime = System.currentTimeMillis() - t3
           val t4 = System.currentTimeMillis()
           // if new data came than rewrite sort index file
           if (distinctValues.size() > 0) {
@@ -385,22 +390,21 @@ class CarbonGlobalDictionaryGenerateRDD(
               distinctValues)
             sortIndexWriteTask.execute()
           }
-          val sortIndexWriteTime = (System.currentTimeMillis() - t4)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicShuffleAndWriteTime()
+          val sortIndexWriteTime = System.currentTimeMillis() - t4
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
           // After sortIndex writing, update dictionaryMeta
           dictWriteTask.updateMetaData()
           // clear the value buffer after writing dictionary data
           valuesBuffer.clear
-          org.apache.carbondata.core.util.CarbonUtil
-            .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
           dictionaryForDistinctValueLookUpCleared = true
-          LOGGER.info("\n columnName:" + model.primDimensions(split.index).getColName +
-              "\n columnId:" + model.primDimensions(split.index).getColumnId +
-              "\n new distinct values count:" + distinctValues.size() +
-              "\n combine lists:" + combineListTime +
-              "\n create dictionary cache:" + dictCacheTime +
-              "\n sort list, distinct and write:" + dictWriteTime +
-              "\n write sort info:" + sortIndexWriteTime)
+          LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
+                      s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
+                      s"\n new distinct values count: ${ distinctValues.size() }" +
+                      s"\n combine lists: $combineListTime" +
+                      s"\n create dictionary cache: $dictCacheTime" +
+                      s"\n sort list, distinct and write: $dictWriteTime" +
+                      s"\n write sort info: $sortIndexWriteTime")
         }
       } catch {
         case ex: Exception =>
@@ -408,11 +412,9 @@ class CarbonGlobalDictionaryGenerateRDD(
           throw ex
       } finally {
         if (!dictionaryForDistinctValueLookUpCleared) {
-          org.apache.carbondata.core.util.CarbonUtil
-            .clearDictionaryCache(dictionaryForDistinctValueLookUp);
+          CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
         }
-        org.apache.carbondata.core.util.CarbonUtil
-          .clearDictionaryCache(dictionaryForSortIndexWriting);
+        CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
         if (dictLock != null && isDictionaryLocked) {
           if (dictLock.unlock()) {
             logInfo(s"Dictionary ${
@@ -441,14 +443,17 @@ class CarbonGlobalDictionaryGenerateRDD(
         (split.index, status, isHighCardinalityColumn)
       }
     }
+
     iter
   }
+
 }
+
 /**
  * Set column dictionry patition format
  *
- * @param id  partition id
- * @param dimension  current carbon dimension
+ * @param id        partition id
+ * @param dimension current carbon dimension
  */
 class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
   extends Partition {
@@ -460,13 +465,13 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
 /**
  * Use external column dict to generate global dictionary
  *
- * @param carbonLoadModel  carbon load model
- * @param sparkContext  spark context
- * @param table  carbon table identifier
- * @param dimensions  carbon dimenisons having predefined dict
- * @param hdfsLocation  carbon base store path
+ * @param carbonLoadModel carbon load model
+ * @param sparkContext    spark context
+ * @param table           carbon table identifier
+ * @param dimensions      carbon dimenisons having predefined dict
+ * @param hdfsLocation    carbon base store path
  * @param dictFolderPath  path of dictionary folder
-*/
+ */
 class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dictionaryLoadModel: DictionaryLoadModel,
     sparkContext: SparkContext,
@@ -505,25 +510,25 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     } catch {
       case ex: Exception =>
         logError(s"Error in reading pre-defined " +
-          s"dictionary file:${ex.getMessage}")
+                 s"dictionary file:${ ex.getMessage }")
         throw ex
     } finally {
       if (csvReader != null) {
         try {
-          csvReader.close
+          csvReader.close()
         } catch {
           case ex: Exception =>
             logError(s"Error in closing csvReader of " +
-              s"pre-defined dictionary file:${ex.getMessage}")
+                     s"pre-defined dictionary file:${ ex.getMessage }")
         }
       }
       if (inputStream != null) {
         try {
-          inputStream.close
+          inputStream.close()
         } catch {
           case ex: Exception =>
             logError(s"Error in closing inputStream of " +
-              s"pre-defined dictionary file:${ex.getMessage}")
+                     s"pre-defined dictionary file:${ ex.getMessage }")
         }
       }
     }



Mime
View raw message