carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [2/3] incubator-carbondata git commit: remove redundant declaration
Date Tue, 27 Dec 2016 15:41:31 GMT
remove redundant declaration

clean up

clean up

clean up RDDFactory

fix compile

fix style

fix style


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6fee9930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6fee9930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6fee9930

Branch: refs/heads/master
Commit: 6fee9930e768d5019c3d6ee3a9a7c0a983011119
Parents: e7b46cc
Author: jackylk <jacky.likun@huawei.com>
Authored: Tue Dec 27 11:27:59 2016 +0800
Committer: chenliang613 <chenliang613@apache.org>
Committed: Tue Dec 27 23:40:18 2016 +0800

----------------------------------------------------------------------
 .../carbondata/spark/util/CarbonQueryUtil.java  |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 150 +++++------
 .../spark/load/DeleteLoadFromMetadata.java      |  44 ----
 .../carbondata/spark/util/CarbonQueryUtil.java  | 248 -------------------
 .../vectorreader/ColumnarVectorWrapper.java     |   2 +-
 .../VectorizedCarbonRecordReader.java           |   8 +-
 .../spark/CarbonColumnValidator.scala           |   2 +-
 .../apache/carbondata/spark/CarbonFilters.scala |  34 +--
 .../carbondata/spark/CarbonSparkFactory.scala   |   4 +-
 .../org/apache/carbondata/spark/KeyVal.scala    |  89 -------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 150 +++++------
 .../spark/sql/CarbonDictionaryDecoder.scala     |  16 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  16 +-
 .../org/apache/spark/sql/CarbonSource.scala     |   2 +-
 .../org/apache/spark/sql/TableCreator.scala     |  29 ++-
 .../execution/CarbonLateDecodeStrategy.scala    |  12 +-
 .../execution/command/carbonTableSchema.scala   |  37 +--
 .../spark/sql/hive/CarbonHiveMetadataUtil.scala |   3 -
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  24 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   1 -
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableLoader.scala     |   4 +-
 .../AllDataTypesTestCaseAggregate.scala         |  19 +-
 .../spark/sql/common/util/QueryTest.scala       |   4 +-
 28 files changed, 242 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
index d2e716f..9d1a281 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -39,7 +39,7 @@ import org.apache.commons.lang3.StringUtils;
 /**
  * This utilty parses the Carbon query plan to actual query model object.
  */
-public final class CarbonQueryUtil {
+public class CarbonQueryUtil {
 
   private CarbonQueryUtil() {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 93194c8..ff7bf23 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -81,10 +81,10 @@ object CarbonDataRDDFactory {
     }
 
     LOGGER.audit(s"Compaction request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val tableCreationTime = CarbonEnv.get.carbonMetastore
-      .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -102,10 +102,10 @@ object CarbonDataRDDFactory {
     )
 
     val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-      )
-      .equalsIgnoreCase("true")
+        .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+        )
+        .equalsIgnoreCase("true")
 
     // if system level compaction is enabled then only one compaction can run in the system
     // if any other request comes at this time then it will create a compaction request file.
@@ -124,13 +124,13 @@ object CarbonDataRDDFactory {
     } else {
       // normal flow of compaction
       val lock = CarbonLockFactory
-        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-          LockUsage.COMPACTION_LOCK
-        )
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.COMPACTION_LOCK
+          )
 
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the compaction lock for table" +
-                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
@@ -147,9 +147,9 @@ object CarbonDataRDDFactory {
         }
       } else {
         LOGGER.audit("Not able to acquire the compaction lock for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Not able to acquire the compaction lock for table" +
-                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         sys.error("Table is already locked for compaction. Please try after some time.")
       }
     }
@@ -164,12 +164,12 @@ object CarbonDataRDDFactory {
       carbonTable: CarbonTable,
       compactionModel: CompactionModel): Unit = {
     val lock = CarbonLockFactory
-      .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
-        LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
-      )
+        .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+          LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
+        )
     if (lock.lockWithRetries()) {
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
-                  s".${ carbonLoadModel.getTableName }")
+          s".${ carbonLoadModel.getTableName }")
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
@@ -190,20 +190,20 @@ object CarbonDataRDDFactory {
       }
     } else {
       LOGGER.audit("Not able to acquire the system level compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       LOGGER.error("Not able to acquire the compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       CarbonCompactionUtil
-        .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+          .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
       // do sys error only in case of DDL trigger.
       if (compactionModel.isDDLTrigger) {
         sys.error("Compaction is in progress, compaction request for table " +
-                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                  " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       } else {
         LOGGER.error("Compaction is in progress, compaction request for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                     " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       }
     }
   }
@@ -226,7 +226,7 @@ object CarbonDataRDDFactory {
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-                     s" ${ e.getMessage }")
+            s" ${ e.getMessage }")
     }
 
     val compactionThread = new Thread {
@@ -250,9 +250,9 @@ object CarbonDataRDDFactory {
           }
           // continue in case of exception also, check for all the tables.
           val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-            ).equalsIgnoreCase("true")
+              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+              ).equalsIgnoreCase("true")
 
           if (!isConcurrentCompactionAllowed) {
             LOGGER.info("System level compaction lock is enabled.")
@@ -262,8 +262,8 @@ object CarbonDataRDDFactory {
                   skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
-                          s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                          s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                  s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                  s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
               val table: CarbonTable = tableForCompaction.carbonTable
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -271,12 +271,12 @@ object CarbonDataRDDFactory {
               val newCarbonLoadModel = new CarbonLoadModel()
               DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
               val tableCreationTime = CarbonEnv.get.carbonMetastore
-                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                  newCarbonLoadModel.getTableName
-                )
+                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                    newCarbonLoadModel.getTableName
+                  )
 
               val compactionSize = CarbonDataMergerUtil
-                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
 
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
@@ -294,27 +294,27 @@ object CarbonDataRDDFactory {
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 // not handling the exception. only logging as this is not the table triggered
                 // by user.
               } finally {
                 // delete the compaction required file in case of failure or success also.
                 if (!CarbonCompactionUtil
-                  .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
                   // if the compaction request file is not been able to delete then
                   // add those tables details to the skip list so that it wont be considered next.
                   skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 }
               }
               // ********* check again for all the tables.
               tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
-                  .tablesMeta.toArray, skipCompactionTables.asJava
-                )
+                  .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
+                      .tablesMeta.toArray, skipCompactionTables.asJava
+                  )
             }
             // giving the user his error for telling in the beeline if his triggered table
             // compaction is failed.
@@ -347,10 +347,10 @@ object CarbonDataRDDFactory {
     // for handling of the segment Merging.
     def handleSegmentMerging(tableCreationTime: Long): Unit = {
       LOGGER.info(s"compaction need status is" +
-                  s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
+          s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
         LOGGER.audit(s"Compaction request received for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         val compactionSize = 0
         val isCompactionTriggerByDDl = false
         val compactionModel = CompactionModel(compactionSize,
@@ -370,10 +370,10 @@ object CarbonDataRDDFactory {
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
         val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-            CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-          )
-          .equalsIgnoreCase("true")
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+            )
+            .equalsIgnoreCase("true")
 
         if (!isConcurrentCompactionAllowed) {
 
@@ -388,9 +388,9 @@ object CarbonDataRDDFactory {
           )
         } else {
           val lock = CarbonLockFactory
-            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-              LockUsage.COMPACTION_LOCK
-            )
+              .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+                LockUsage.COMPACTION_LOCK
+              )
 
           if (lock.lockWithRetries()) {
             LOGGER.info("Acquired the compaction lock.")
@@ -411,15 +411,15 @@ object CarbonDataRDDFactory {
             }
           } else {
             LOGGER.audit("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
           }
         }
       }
@@ -427,10 +427,10 @@ object CarbonDataRDDFactory {
 
     try {
       LOGGER.audit(s"Data load request has been received for table" +
-                   s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       if (!useKettle) {
         LOGGER.audit("Data is loading with New Data Flow for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
@@ -468,16 +468,16 @@ object CarbonDataRDDFactory {
       } catch {
         case e: Exception =>
           LOGGER
-            .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
+              .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
       }
 
       // reading the start time of data load.
       val loadStartTime = CarbonLoaderUtil.readCurrentTime()
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       val tableCreationTime = CarbonEnv.get.carbonMetastore
-        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
-        .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -528,7 +528,7 @@ object CarbonDataRDDFactory {
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
                 (split.getPartition.getUniqueID,
-                  SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+                    SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
             }
           }
         } else {
@@ -570,15 +570,15 @@ object CarbonDataRDDFactory {
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil
-            .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+              .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
           val nodeBlockMapping =
             CarbonLoaderUtil
-              .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-              .toSeq
+                .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+                .toSeq
           val timeElapsed: Long = System.currentTimeMillis - startTime
           LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
           LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-                      s"No.of Nodes: ${nodeBlockMapping.size}")
+              s"No.of Nodes: ${nodeBlockMapping.size}")
           var str = ""
           nodeBlockMapping.foreach(entry => {
             val tableBlock = entry._2
@@ -588,7 +588,7 @@ object CarbonDataRDDFactory {
                 hostentry.equalsIgnoreCase(entry._1)
               )) {
                 str = str + " , mismatch locations: " + tableBlockInfo.getLocations
-                  .foldLeft("")((a, b) => a + "," + b)
+                    .foldLeft("")((a, b) => a + "," + b)
               }
             )
             str = str + "\n"
@@ -743,7 +743,7 @@ object CarbonDataRDDFactory {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
@@ -754,10 +754,10 @@ object CarbonDataRDDFactory {
           if (!status) {
             val errorMessage = "Dataload failed due to failure in table status updation."
             LOGGER.audit("Data load is failed for " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Dataload failed due to failure in table status updation.")
             throw new Exception(errorMessage)
           }
@@ -766,7 +766,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********Database updated**********")
         }
         LOGGER.audit("Data load is successful for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
deleted file mode 100644
index 0926e1c..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name  : Carbon
- * Module Name   : CARBON Data Processor
- * Author    : R00903928
- * Created Date  : 21-Sep-2015
- * FileName   : DeleteLoadFromMetadata.java
- * Description   : Kettle step to generate MD Key
- * Class Version  : 1.0
- */
-package org.apache.carbondata.spark.load;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DeleteLoadFromMetadata {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteLoadFromMetadata.class.getName());
-
-  private DeleteLoadFromMetadata() {
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
deleted file mode 100644
index 04ef665..0000000
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.util;
-
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
-import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
-import org.apache.carbondata.spark.splits.TableSplit;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * This utilty parses the Carbon query plan to actual query model object.
- */
-public final class CarbonQueryUtil {
-
-  private CarbonQueryUtil() {
-
-  }
-
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      CarbonQueryPlan queryPlan) throws IOException {
-
-    //Just create splits depends on locations of region servers
-    List<Partition> allPartitions = null;
-    if (queryPlan == null) {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
-    } else {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
-    }
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location =
-              QueryPartitionHelper.getInstance().getLocation(partition, databaseName, tableName);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-
-    return splits;
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
-
-    //Just create splits depends on locations of region servers
-    FileType fileType = FileFactory.getFileType(sourcePath);
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
-    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
-    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < tblSplits.length; i++) {
-      tblSplits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      tblSplits[i].setPartition(partition);
-      tblSplits[i].setLocations(locations);
-    }
-    return tblSplits;
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getPartitionSplits(String sourcePath, String[] nodeList,
-      int partitionCount) throws Exception {
-
-    //Just create splits depends on locations of region servers
-    FileType fileType = FileFactory.getFileType(sourcePath);
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllPartitions(sourcePath, fileType, partitionCount);
-    loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-    return splits;
-  }
-
-  public static void getAllFiles(String sourcePath, List<String> partitionsFiles, FileType fileType)
-      throws Exception {
-
-    if (!FileFactory.isFileExist(sourcePath, fileType, false)) {
-      throw new Exception("Source file doesn't exist at path: " + sourcePath);
-    }
-
-    CarbonFile file = FileFactory.getCarbonFile(sourcePath, fileType);
-    if (file.isDirectory()) {
-      CarbonFile[] fileNames = file.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile pathname) {
-          return true;
-        }
-      });
-      for (int i = 0; i < fileNames.length; i++) {
-        getAllFiles(fileNames[i].getPath(), partitionsFiles, fileType);
-      }
-    } else {
-      // add only csv files
-      if (file.getName().endsWith("csv")) {
-        partitionsFiles.add(file.getPath());
-      }
-    }
-  }
-
-  /**
-   * split sourcePath by comma
-   */
-  public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
-      String separator) {
-    if (StringUtils.isNotEmpty(sourcePath)) {
-      String[] files = sourcePath.split(separator);
-      for (String file : files) {
-        partitionsFiles.add(file);
-      }
-    }
-  }
-
-  private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
-    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
-    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
-    for (int i = 0; i < files.size(); i++) {
-      partitionFiles.get(i % 1).add(files.get(i));
-    }
-    return partitionList;
-  }
-
-  private static List<Partition> getAllPartitions(String sourcePath, FileType fileType,
-      int partitionCount) throws Exception {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    int[] numberOfFilesPerPartition = getNumberOfFilesPerPartition(files.size(), partitionCount);
-    int startIndex = 0;
-    int endIndex = 0;
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    if (numberOfFilesPerPartition != null) {
-      for (int i = 0; i < numberOfFilesPerPartition.length; i++) {
-        List<String> partitionFiles =
-            new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-        endIndex += numberOfFilesPerPartition[i];
-        for (int j = startIndex; j < endIndex; j++) {
-          partitionFiles.add(files.get(j));
-        }
-        startIndex += numberOfFilesPerPartition[i];
-        partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles));
-      }
-    }
-    return partitionList;
-  }
-
-  private static int[] getNumberOfFilesPerPartition(int numberOfFiles, int partitionCount) {
-    int div = numberOfFiles / partitionCount;
-    int mod = numberOfFiles % partitionCount;
-    int[] numberOfNodeToScan = null;
-    if (div > 0) {
-      numberOfNodeToScan = new int[partitionCount];
-      Arrays.fill(numberOfNodeToScan, div);
-    } else if (mod > 0) {
-      numberOfNodeToScan = new int[mod];
-    }
-    for (int i = 0; i < mod; i++) {
-      numberOfNodeToScan[i] = numberOfNodeToScan[i] + 1;
-    }
-    return numberOfNodeToScan;
-  }
-
-  public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
-    List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    if (null != details) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
-          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
-          slices.add(loadName);
-        }
-      }
-    }
-    return slices;
-  }
-
-  /**
-   * This method will clear the dictionary cache for a given map of columns and dictionary cache
-   * mapping
-   *
-   * @param columnToDictionaryMap
-   */
-  public static void clearColumnDictionaryCache(Map<String, Dictionary> columnToDictionaryMap) {
-    for (Map.Entry<String, Dictionary> entry : columnToDictionaryMap.entrySet()) {
-      CarbonUtil.clearDictionaryCache(entry.getValue());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 84e5c07..5ed7389 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -21,7 +21,7 @@ import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
 import org.apache.spark.sql.types.Decimal;
 
-public class ColumnarVectorWrapper implements CarbonColumnVector {
+class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private ColumnVector columnVector;
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index ba02bca..1beea97 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -55,7 +55,7 @@ import org.apache.spark.sql.types.StructType;
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
  * carbondata column APIs and fills the data directly into columns.
  */
-public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
+class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
 
   private int batchIdx = 0;
 
@@ -166,7 +166,7 @@ public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
    * before any calls to nextKeyValue/nextBatch.
    */
 
-  public void initBatch(MemoryMode memMode) {
+  private void initBatch(MemoryMode memMode) {
     List<QueryDimension> queryDimension = queryModel.getQueryDimension();
     List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
     StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
@@ -232,14 +232,14 @@ public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
   /*
    * Can be called before any rows are returned to enable returning columnar batches directly.
    */
-  public void enableReturningBatches() {
+  private void enableReturningBatches() {
     returnColumnarBatch = true;
   }
 
   /**
    * Advances to the next batch of rows. Returns false if there are no more.
    */
-  public boolean nextBatch() throws IOException {
+  private boolean nextBatch() {
     columnarBatch.reset();
     carbonColumnarBatch.reset();
     if (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
index ea97bca..31bbf19 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -23,7 +23,7 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
   * Carbon column validator
   */
 class CarbonColumnValidator extends ColumnValidator {
-  def validateColumns(allColumns: Seq[ColumnSchema]) {
+  def validateColumns(allColumns: Seq[ColumnSchema]): Unit = {
     allColumns.foreach { columnSchema =>
       val colWithSameId = allColumns.filter { x =>
         x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
index 6d9fb24..0a84891 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.spark
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper}
+import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.StructType
 
@@ -151,13 +151,13 @@ object CarbonFilters {
         case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
           Some(sources.EqualTo(a.name, v))
 
-        case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+        case Not(EqualTo(a: Attribute, Literal(v, t))) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+        case Not(EqualTo(Literal(v, t), a: Attribute)) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
             Some(sources.Not(sources.EqualTo(a.name, v)))
         case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
         case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
@@ -221,7 +221,7 @@ object CarbonFilters {
           None
       }
     }
-    filters.flatMap(translate(_, false)).toArray
+    filters.flatMap(translate(_)).toArray
   }
 
   def processExpression(exprs: Seq[Expression],
@@ -231,8 +231,8 @@ object CarbonFilters {
     def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
       expr match {
         case or@ Or(left, right) =>
-          val leftFilter = transformExpression(left, true)
-          val rightFilter = transformExpression(right, true)
+          val leftFilter = transformExpression(left, or = true)
+          val rightFilter = transformExpression(right, or = true)
           if (leftFilter.isDefined && rightFilter.isDefined) {
             Some(new OrExpression(leftFilter.get, rightFilter.get))
           } else {
@@ -247,22 +247,22 @@ object CarbonFilters {
           (transformExpression(left) ++ transformExpression(right)).reduceOption(new
               AndExpression(_, _))
 
-        case EqualTo(a: Attribute, l@Literal(v, t)) => new
+        case EqualTo(a: Attribute, l@Literal(v, t)) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), a: Attribute) => new
+        case EqualTo(l@Literal(v, t), a: Attribute) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
+        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
+        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) =>
             Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
 
-        case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
+        case Not(EqualTo(a: Attribute, l@Literal(v, t))) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
+        case Not(EqualTo(l@Literal(v, t), a: Attribute)) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
+        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
+        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) =>
             Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
         case IsNotNull(child: Attribute) =>
             Some(new NotEqualsExpression(transformExpression(child).get,
@@ -357,7 +357,7 @@ object CarbonFilters {
           None
       }
     }
-    exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
+    exprs.flatMap(transformExpression(_)).reduceOption(new AndExpression(_, _))
   }
   private def isNullLiteral(exp: Expression): Boolean = {
     if (null != exp

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
index 7618558..6e3a1c8 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
@@ -47,14 +47,14 @@ object CarbonSparkFactory {
    /**
     * @return column validator
     */
-  def getCarbonColumnValidator(): ColumnValidator = {
+  def getCarbonColumnValidator: ColumnValidator = {
     new CarbonColumnValidator
   }
 
   /**
    * @return dictionary helper
    */
-  def getDictionaryDetailService(): DictionaryDetailService = {
+  def getDictionaryDetailService: DictionaryDetailService = {
     new DictionaryDetailHelper
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
deleted file mode 100644
index 254052b..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * It is just Key value class. I don't get any other alternate to make the RDD class to
- * work with my minimum knowledge in scala.
- * May be I will remove later once I gain good knowledge :)
- *
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.load.LoadMetadataDetails
-
-trait Value[V] extends Serializable {
-  def getValue(value: Array[Object]): V
-}
-
-class ValueImpl extends Value[Array[Object]] {
-  override def getValue(value: Array[Object]): Array[Object] = value
-}
-
-trait RawValue[V] extends Serializable {
-  def getValue(value: Array[Any]): V
-}
-
-class RawValueImpl extends RawValue[Array[Any]] {
-  override def getValue(value: Array[Any]): Array[Any] = value
-}
-
-trait DataLoadResult[K, V] extends Serializable {
-  def getKey(key: String, value: LoadMetadataDetails): (K, V)
-}
-
-class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
-  override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
-    (key, value)
-  }
-}
-
-
-trait PartitionResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class PartitionResultImpl extends PartitionResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait MergeResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class MergeResultImpl extends MergeResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait DeletedLoadResult[K, V] extends Serializable {
-  def getKey(key: String, value: String): (K, V)
-}
-
-class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
-  override def getKey(key: String, value: String): (String, String) = (key, value)
-}
-
-trait RestructureResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-}
-
-class RestructureResultImpl extends RestructureResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index de07707..f451a54 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -79,10 +79,10 @@ object CarbonDataRDDFactory {
     }
 
     LOGGER.audit(s"Compaction request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val tableCreationTime = CarbonEnv.get.carbonMetastore
-      .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
@@ -100,10 +100,10 @@ object CarbonDataRDDFactory {
     )
 
     val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-      )
-      .equalsIgnoreCase("true")
+        .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+        )
+        .equalsIgnoreCase("true")
 
     // if system level compaction is enabled then only one compaction can run in the system
     // if any other request comes at this time then it will create a compaction request file.
@@ -122,13 +122,13 @@ object CarbonDataRDDFactory {
     } else {
       // normal flow of compaction
       val lock = CarbonLockFactory
-        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-          LockUsage.COMPACTION_LOCK
-        )
+          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+            LockUsage.COMPACTION_LOCK
+          )
 
       if (lock.lockWithRetries()) {
         LOGGER.info("Acquired the compaction lock for table" +
-                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
@@ -145,9 +145,9 @@ object CarbonDataRDDFactory {
         }
       } else {
         LOGGER.audit("Not able to acquire the compaction lock for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Not able to acquire the compaction lock for table" +
-                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         sys.error("Table is already locked for compaction. Please try after some time.")
       }
     }
@@ -162,12 +162,12 @@ object CarbonDataRDDFactory {
       carbonTable: CarbonTable,
       compactionModel: CompactionModel): Unit = {
     val lock = CarbonLockFactory
-      .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
-        LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
-      )
+        .getCarbonLockObj(CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
+          LockUsage.SYSTEMLEVEL_COMPACTION_LOCK
+        )
     if (lock.lockWithRetries()) {
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
-                  s".${ carbonLoadModel.getTableName }")
+          s".${ carbonLoadModel.getTableName }")
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
@@ -188,20 +188,20 @@ object CarbonDataRDDFactory {
       }
     } else {
       LOGGER.audit("Not able to acquire the system level compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       LOGGER.error("Not able to acquire the compaction lock for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       CarbonCompactionUtil
-        .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
+          .createCompactionRequiredFile(carbonTable.getMetaDataFilepath, compactionType)
       // do sys error only in case of DDL trigger.
       if (compactionModel.isDDLTrigger) {
         sys.error("Compaction is in progress, compaction request for table " +
-                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                  " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       } else {
         LOGGER.error("Compaction is in progress, compaction request for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                     " is in queue.")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+            " is in queue.")
       }
     }
   }
@@ -224,7 +224,7 @@ object CarbonDataRDDFactory {
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-                     s" ${ e.getMessage }")
+            s" ${ e.getMessage }")
     }
 
     val compactionThread = new Thread {
@@ -248,9 +248,9 @@ object CarbonDataRDDFactory {
           }
           // continue in case of exception also, check for all the tables.
           val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-            ).equalsIgnoreCase("true")
+              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+              ).equalsIgnoreCase("true")
 
           if (!isConcurrentCompactionAllowed) {
             LOGGER.info("System level compaction lock is enabled.")
@@ -260,8 +260,8 @@ object CarbonDataRDDFactory {
                   skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
-                          s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                          s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                  s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                  s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
               val table: CarbonTable = tableForCompaction.carbonTable
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
@@ -269,12 +269,12 @@ object CarbonDataRDDFactory {
               val newCarbonLoadModel = new CarbonLoadModel()
               DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
               val tableCreationTime = CarbonEnv.get.carbonMetastore
-                .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                  newCarbonLoadModel.getTableName
-                )
+                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                    newCarbonLoadModel.getTableName
+                  )
 
               val compactionSize = CarbonDataMergerUtil
-                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
 
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
@@ -292,27 +292,27 @@ object CarbonDataRDDFactory {
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 // not handling the exception. only logging as this is not the table triggered
                 // by user.
               } finally {
                 // delete the compaction required file in case of failure or success also.
                 if (!CarbonCompactionUtil
-                  .deleteCompactionRequiredFile(metadataPath, compactionType)) {
+                    .deleteCompactionRequiredFile(metadataPath, compactionType)) {
                   // if the compaction request file is not been able to delete then
                   // add those tables details to the skip list so that it wont be considered next.
                   skipCompactionTables.+=:(tableForCompaction.carbonTableIdentifier)
                   LOGGER.error("Compaction request file can not be deleted for table " +
-                               s"${ tableForCompaction.carbonTable.getDatabaseName }." +
-                               s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
+                      s"${ tableForCompaction.carbonTable.getDatabaseName }." +
+                      s"${ tableForCompaction.carbonTableIdentifier.getTableName }")
                 }
               }
               // ********* check again for all the tables.
               tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
-                  .tablesMeta.toArray, skipCompactionTables.asJava
-                )
+                  .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
+                      .tablesMeta.toArray, skipCompactionTables.asJava
+                  )
             }
             // giving the user his error for telling in the beeline if his triggered table
             // compaction is failed.
@@ -345,10 +345,10 @@ object CarbonDataRDDFactory {
     // for handling of the segment Merging.
     def handleSegmentMerging(tableCreationTime: Long): Unit = {
       LOGGER.info(s"compaction need status is" +
-                  s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
+          s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
       if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
         LOGGER.audit(s"Compaction request received for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         val compactionSize = 0
         val isCompactionTriggerByDDl = false
         val compactionModel = CompactionModel(compactionSize,
@@ -368,10 +368,10 @@ object CarbonDataRDDFactory {
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
         val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-            CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-          )
-          .equalsIgnoreCase("true")
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+            )
+            .equalsIgnoreCase("true")
 
         if (!isConcurrentCompactionAllowed) {
 
@@ -386,9 +386,9 @@ object CarbonDataRDDFactory {
           )
         } else {
           val lock = CarbonLockFactory
-            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-              LockUsage.COMPACTION_LOCK
-            )
+              .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+                LockUsage.COMPACTION_LOCK
+              )
 
           if (lock.lockWithRetries()) {
             LOGGER.info("Acquired the compaction lock.")
@@ -409,15 +409,15 @@ object CarbonDataRDDFactory {
             }
           } else {
             LOGGER.audit("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
           }
         }
       }
@@ -425,10 +425,10 @@ object CarbonDataRDDFactory {
 
     try {
       LOGGER.audit(s"Data load request has been received for table" +
-                   s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       if (!useKettle) {
         LOGGER.audit("Data is loading with New Data Flow for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
@@ -466,16 +466,16 @@ object CarbonDataRDDFactory {
       } catch {
         case e: Exception =>
           LOGGER
-            .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
+              .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
       }
 
       // reading the start time of data load.
       val loadStartTime = CarbonLoaderUtil.readCurrentTime()
       carbonLoadModel.setFactTimeStamp(loadStartTime)
       val tableCreationTime = CarbonEnv.get.carbonMetastore
-        .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
-        .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+          .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
@@ -526,7 +526,7 @@ object CarbonDataRDDFactory {
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
                 (split.getPartition.getUniqueID,
-                  SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+                    SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
             }
           }
         } else {
@@ -568,15 +568,15 @@ object CarbonDataRDDFactory {
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil
-            .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+              .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
           val nodeBlockMapping =
             CarbonLoaderUtil
-              .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-              .toSeq
+                .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+                .toSeq
           val timeElapsed: Long = System.currentTimeMillis - startTime
           LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
           LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-                      s"No.of Nodes: ${nodeBlockMapping.size}")
+              s"No.of Nodes: ${nodeBlockMapping.size}")
           var str = ""
           nodeBlockMapping.foreach(entry => {
             val tableBlock = entry._2
@@ -586,7 +586,7 @@ object CarbonDataRDDFactory {
                 hostentry.equalsIgnoreCase(entry._1)
               )) {
                 str = str + " , mismatch locations: " + tableBlockInfo.getLocations
-                  .foldLeft("")((a, b) => a + "," + b)
+                    .foldLeft("")((a, b) => a + "," + b)
               }
             )
             str = str + "\n"
@@ -723,7 +723,7 @@ object CarbonDataRDDFactory {
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
         throw new Exception(errorMessage)
       } else {
@@ -734,10 +734,10 @@ object CarbonDataRDDFactory {
           if (!status) {
             val errorMessage = "Dataload failed due to failure in table status updation."
             LOGGER.audit("Data load is failed for " +
-                         s"${ carbonLoadModel.getDatabaseName }.${
-                           carbonLoadModel
-                             .getTableName
-                         }")
+                s"${ carbonLoadModel.getDatabaseName }.${
+                  carbonLoadModel
+                      .getTableName
+                }")
             LOGGER.error("Dataload failed due to failure in table status updation.")
             throw new Exception(errorMessage)
           }
@@ -746,7 +746,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********Database updated**********")
         }
         LOGGER.audit("Data load is successful for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index ce5962d..8deacc0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -159,10 +159,10 @@ case class CarbonDictionaryDecoder(
           // add a task completion listener to clear dictionary that is a decisive factor for
           // LRU eviction policy
           val dictionaryTaskCleaner = TaskContext.get
-          dictionaryTaskCleaner.addTaskCompletionListener(context =>
+          dictionaryTaskCleaner.addTaskCompletionListener(_ =>
             dicts.foreach { dictionary =>
               if (null != dictionary) {
-                dictionary.clear
+                dictionary.clear()
               }
             }
           )
@@ -312,10 +312,10 @@ class CarbonDecoderRDD(
     // add a task completion listener to clear dictionary that is a decisive factor for
     // LRU eviction policy
     val dictionaryTaskCleaner = TaskContext.get
-    dictionaryTaskCleaner.addTaskCompletionListener(context =>
+    dictionaryTaskCleaner.addTaskCompletionListener(_ =>
       dicts.foreach { dictionary =>
         if (null != dictionary) {
-          dictionary.clear
+          dictionary.clear()
         }
       }
     )
@@ -327,7 +327,6 @@ class CarbonDecoderRDD(
       override final def hasNext: Boolean = iter.hasNext
 
       override final def next(): InternalRow = {
-        val startTime = System.currentTimeMillis()
         val row: InternalRow = iter.next()
         val data = row.toSeq(dataTypes).toArray
         dictIndex.foreach { index =>
@@ -342,13 +341,6 @@ class CarbonDecoderRDD(
     }
   }
 
-  private def isRequiredToDecode = {
-    getDictionaryColumnIds.find(p => p._1 != null) match {
-      case Some(value) => true
-      case _ => false
-    }
-  }
-
   private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
                             cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
     val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 748d292..67ee478 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -50,10 +50,10 @@ object CarbonSession {
 
       // Get the session from current thread's active session.
       var session: SparkSession = SparkSession.getActiveSession match {
-        case Some(session) =>
-          if ((session ne null) && !session.sparkContext.isStopped) {
-            options.foreach { case (k, v) => session.conf.set(k, v) }
-            session
+        case Some(sparkSession) =>
+          if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+            options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+            sparkSession
           } else {
             null
           }
@@ -67,10 +67,10 @@ object CarbonSession {
       SparkSession.synchronized {
         // If the current thread does not have an active session, get it from the global session.
         session = SparkSession.getDefaultSession match {
-          case Some(session) =>
-            if ((session ne null) && !session.sparkContext.isStopped) {
-              options.foreach { case (k, v) => session.conf.set(k, v) }
-              session
+          case Some(sparkSession) =>
+            if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+              options.foreach { case (k, v) => sparkSession.conf.set(k, v) }
+              sparkSession
             } else {
               null
             }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index c78ddf3..8a946c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -130,7 +130,7 @@ class CarbonSource extends CreatableRelationProvider
           }
           f
         }
-        val map = scala.collection.mutable.Map[String, String]();
+        val map = scala.collection.mutable.Map[String, String]()
         parameters.foreach { x => map.put(x._1, x._2) }
         val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
         CreateTable(cm, false).run(sparkSession)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
index 1faaafa..362c951 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala
@@ -62,9 +62,9 @@ object TableCreator {
     // All excluded cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
       dictExcludeCols
-        .map { dictExcludeCol =>
+        .foreach { dictExcludeCol =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
             val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
               " does not exist in table. Please check create table statement."
@@ -87,8 +87,8 @@ object TableCreator {
     // All included cols should be there in create table cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
       dictIncludeCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
-      dictIncludeCols.map { distIncludeCol =>
+        tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(",").map(_.trim)
+      dictIncludeCols.foreach { distIncludeCol =>
         if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
           val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
             " does not exist in table. Please check create table statement."
@@ -117,9 +117,9 @@ object TableCreator {
         }
         dimFields += field
       } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
-        dimFields += (field)
+        dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
-        dimFields += (field)
+        dimFields += field
       }
     }
     )
@@ -143,13 +143,13 @@ object TableCreator {
     // get all included cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
       dictIncludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
+        tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim)
     }
 
     // get all excluded cols
     if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
       dictExcludedCols =
-        tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
+        tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
     }
 
     // by default consider all non string cols as msrs. consider all include/ exclude cols as dims
@@ -264,7 +264,7 @@ object TableCreator {
     if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {
 
       var splittedColGrps: Seq[String] = Seq[String]()
-      val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
+      val nonSplitCols: String = tableProperties(CarbonCommonConstants.COLUMN_GROUPS)
 
       // row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
       // here first splitting the value by () . so that the above will be splitted into 2 strings.
@@ -313,9 +313,8 @@ object TableCreator {
 
     if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
       noInvertedIdxColsProps =
-        tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
-      noInvertedIdxColsProps
-        .map { noInvertedIdxColProp =>
+        tableProperties("NO_INVERTED_INDEX").split(',').map(_.trim)
+      noInvertedIdxColsProps.foreach { noInvertedIdxColProp =>
           if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
             val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
               " does not exist in table. Please check create table statement."
@@ -357,11 +356,11 @@ object TableCreator {
         field.storeType
       )
       case "array" => Field(field.column, Some("Array"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
+        field.children.map(f => f.map(normalizeType)),
         field.parent, field.storeType
       )
       case "struct" => Field(field.column, Some("Struct"), field.name,
-        field.children.map(f => f.map(normalizeType(_))),
+        field.children.map(f => f.map(normalizeType)),
         field.parent, field.storeType
       )
       case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
@@ -372,7 +371,7 @@ object TableCreator {
       // checking if the nested data type contains the child type as decimal(10,0),
       // if it is present then extracting the precision and scale. resetting the data type
       // with Decimal.
-      case _ if (dataType.startsWith("decimal")) =>
+      case _ if dataType.startsWith("decimal") =>
         val (precision, scale) = getScaleAndPrecision(dataType)
         Field(field.column,
           Some("Decimal"),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 51b79c5..fe8bbe7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -92,7 +92,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       rdd: RDD[InternalRow],
       needDecode: ArrayBuffer[AttributeReference]):
   RDD[InternalRow] = {
-    if (needDecode.size > 0) {
+    if (needDecode.nonEmpty) {
       rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
       getDecoderRDD(relation, needDecode, rdd, output)
     } else {
@@ -249,7 +249,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       needDecoder: ArrayBuffer[AttributeReference],
       updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = {
     if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
-        needDecoder.length == 0) {
+        needDecoder.isEmpty) {
       BatchedDataSourceScanExec(
         output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
@@ -362,13 +362,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
         Some(sources.EqualTo(a.name, v))
 
-      case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+      case Not(EqualTo(a: Attribute, Literal(v, t))) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
-      case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+      case Not(EqualTo(Literal(v, t), a: Attribute)) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
-      case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+      case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
-      case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+      case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
           Some(sources.Not(sources.EqualTo(a.name, v)))
       case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
       case IsNull(a: Attribute) => Some(sources.IsNull(a.name))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6fee9930/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8f97961..86bd92b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -168,7 +168,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
             s"""CREATE TABLE $dbName.$tbName
                 |(${(cm.dimCols ++ cm.msrCols).map(f => f.rawSchema).mkString(",")})
                 |USING org.apache.spark.sql.CarbonSource""".stripMargin +
-            s""" OPTIONS (tableName "$tbName", dbName "${dbName}", tablePath "$tablePath") """)
+            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """)
         } catch {
           case e: Exception =>
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -214,14 +214,6 @@ case class DeleteLoadsById(
 
   }
 
-  // validates load ids
-  private def validateLoadIds: Unit = {
-    if (loadids.isEmpty) {
-      val errorMessage = "Error: Segment id(s) should not be empty."
-      throw new MalformedCarbonCommandException(errorMessage)
-
-    }
-  }
 }
 
 case class DeleteLoadsByLoadDate(
@@ -293,8 +285,8 @@ case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: Lo
       relation.carbonRelation.tableName,
       null,
       Seq(),
-      scala.collection.immutable.Map(("fileheader" -> header)),
-      false,
+      scala.collection.immutable.Map("fileheader" -> header),
+      isOverwriteExist = false,
       null,
       Some(df)).run(sparkSession)
     // updating relation metadata. This is in case of auto detect high cardinality
@@ -326,7 +318,6 @@ case class LoadTable(
     }
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-    val identifier = TableIdentifier(tableName, Option(dbName))
     if (isOverwriteExist) {
       sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
@@ -378,9 +369,9 @@ case class LoadTable(
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
 
-      var partitionLocation = relation.tableMeta.storePath + "/partition/" +
-                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
-                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+      val partitionLocation = relation.tableMeta.storePath + "/partition/" +
+          relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+          relation.tableMeta.carbonTableIdentifier.getTableName + "/"
 
 
       val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
@@ -412,15 +403,6 @@ case class LoadTable(
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
       val dateFormat = options.getOrElse("dateformat", null)
       validateDateFormat(dateFormat, table)
-      val multiLine = options.getOrElse("multiline", "false").trim.toLowerCase match {
-        case "true" => true
-        case "false" => false
-        case illegal =>
-          val errorMessage = "Illegal syntax found: [" + illegal + "] .The value multiline in " +
-                             "load DDL which you set can only be 'true' or 'false', please check " +
-                             "your input DDL."
-          throw new MalformedCarbonCommandException(errorMessage)
-      }
       val maxColumns = options.getOrElse("maxcolumns", null)
       carbonLoadModel.setMaxColumns(maxColumns)
       carbonLoadModel.setEscapeChar(escapeChar)
@@ -451,7 +433,7 @@ case class LoadTable(
       // set local dictionary path, and dictionary file extension
       carbonLoadModel.setAllDictPath(allDictionaryPath)
 
-      var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+      val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       try {
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -521,7 +503,7 @@ case class LoadTable(
         throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
                                                   "string.")
       } else {
-        var dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
+        val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
         for (singleDateFormat <- dateFormats) {
           val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
           val columnName = dateFormatSplits(0).trim.toLowerCase
@@ -667,7 +649,6 @@ private[sql] case class DescribeCommandFormatted(
           relation.tableMeta.carbonTableIdentifier.getTableName,
           field.name)
         if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
-          val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
           colProps.append(field.name).append(".")
             .append(mapper.writeValueAsString(dimension.getColumnProperties))
             .append(",")
@@ -679,7 +660,7 @@ private[sql] case class DescribeCommandFormatted(
           "KEY COLUMN"
         }
       } else {
-        ("MEASURE")
+        "MEASURE"
       }
       (field.name, field.dataType.simpleString, comment)
     }



Mime
View raw message