carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/4] incubator-carbondata git commit: Improved spark module code. * Removed some compliation warnings. * Replace pattern matching for boolean to IF-ELSE. * Improved code according to scala standards. * Removed unnecessary new lines. * Added string inter
Date Sat, 19 Nov 2016 02:16:50 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master c5176f31e -> 0a8e782ff


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
index 0a35b21..89e1aa9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala
@@ -40,20 +40,18 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
 
   override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = {
 
-    val values = carbonRowInstance.getValues().toSeq.map { value =>
-      value match {
-        case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
-        case d: java.math.BigDecimal =>
-          val javaDecVal = new java.math.BigDecimal(d.toString())
-          val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
-          val decConverter = new org.apache.spark.sql.types.Decimal()
-          decConverter.set(scalaDecVal)
-        case _ => value
-      }
+    val values = carbonRowInstance.getValues.toSeq.map {
+      case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s)
+      case d: java.math.BigDecimal =>
+        val javaDecVal = new java.math.BigDecimal(d.toString)
+        val scalaDecVal = new scala.math.BigDecimal(javaDecVal)
+        val decConverter = new org.apache.spark.sql.types.Decimal()
+        decConverter.set(scalaDecVal)
+      case value => value
     }
     try {
       val result = evaluateExpression(
-          new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
+        new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray))
       val sparkRes = if (isExecutor) {
         result.asInstanceOf[InternalRow].get(0, sparkExp.dataType)
       } else {
@@ -62,17 +60,16 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
       new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType),
         sparkRes
       )
-    }
-    catch {
-      case e: Exception => throw new FilterUnsupportedException(e.getMessage())
+    } catch {
+      case e: Exception => throw new FilterUnsupportedException(e.getMessage)
     }
   }
 
-  override def getFilterExpressionType(): ExpressionType = {
+  override def getFilterExpressionType: ExpressionType = {
     ExpressionType.UNKNOWN
   }
 
-  override def getString(): String = {
+  override def getString: String = {
     sparkExp.toString()
   }
 
@@ -81,46 +78,45 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
     isExecutor = true
   }
 
-  def getColumnList(): java.util.List[ColumnExpression] = {
+  def getColumnList: java.util.List[ColumnExpression] = {
 
     val lst = new java.util.ArrayList[ColumnExpression]()
     getColumnListFromExpressionTree(sparkExp, lst)
     lst
   }
-    def getLiterals(): java.util.List[ExpressionResult] = {
+  def getLiterals: java.util.List[ExpressionResult] = {
 
     val lst = new java.util.ArrayList[ExpressionResult]()
     lst
   }
 
-  def getAllColumnList(): java.util.List[ColumnExpression] = {
+  def getAllColumnList: java.util.List[ColumnExpression] = {
     val lst = new java.util.ArrayList[ColumnExpression]()
     getAllColumnListFromExpressionTree(sparkExp, lst)
     lst
   }
 
-  def isSingleDimension(): Boolean = {
+  def isSingleDimension: Boolean = {
     val lst = new java.util.ArrayList[ColumnExpression]()
     getAllColumnListFromExpressionTree(sparkExp, lst)
     if (lst.size == 1 && lst.get(0).isDimension) {
       true
-    }
-    else {
+    } else {
       false
     }
   }
 
   def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
-    list: java.util.List[ColumnExpression]): Unit = {
+      list: java.util.List[ColumnExpression]): Unit = {
     sparkCurrentExp match {
       case carbonBoundRef: CarbonBoundReference =>
         val foundExp = list.asScala
-          .find(p => p.getColumnName() == carbonBoundRef.colExp.getColumnName())
+          .find(p => p.getColumnName == carbonBoundRef.colExp.getColumnName)
         if (foundExp.isEmpty) {
           carbonBoundRef.colExp.setColIndex(list.size)
           list.add(carbonBoundRef.colExp)
         } else {
-          carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex())
+          carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex)
         }
       case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
     }
@@ -128,7 +124,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
 
 
   def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression,
-    list: List[ColumnExpression]): List[ColumnExpression] = {
+      list: List[ColumnExpression]): List[ColumnExpression] = {
     sparkCurrentExp match {
       case carbonBoundRef: CarbonBoundReference => list.add(carbonBoundRef.colExp)
       case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list))
@@ -136,13 +132,12 @@ class SparkUnknownExpression(var sparkExp: SparkExpression)
     list
   }
 
-  def isDirectDictionaryColumns(): Boolean = {
+  def isDirectDictionaryColumns: Boolean = {
     val lst = new ArrayList[ColumnExpression]()
     getAllColumnListFromExpressionTree(sparkExp, lst)
     if (lst.get(0).getCarbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
       true
-    }
-    else {
+    } else {
       false
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ed757e3..a6b4ec5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -45,7 +45,8 @@ import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
 import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension,
+ColumnSchema}
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
@@ -61,7 +62,8 @@ import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil,
+GlobalDictionaryUtil}
 
 case class tableModel(
     ifNotExistsSet: Boolean,
@@ -166,18 +168,24 @@ case class NodeInfo(TaskId: String, noOfBlocks: Int)
 
 
 case class AlterTableModel(dbName: Option[String], tableName: String,
-  compactionType: String, alterSql: String)
+    compactionType: String, alterSql: String)
 
 case class CompactionModel(compactionSize: Long,
-  compactionType: CompactionType,
-  carbonTable: CarbonTable,
-  tableCreationTime: Long,
-  isDDLTrigger: Boolean)
+    compactionType: CompactionType,
+    carbonTable: CarbonTable,
+    tableCreationTime: Long,
+    isDDLTrigger: Boolean)
 
-case class CompactionCallableModel(storePath: String, carbonLoadModel: CarbonLoadModel,
-  partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String,
-  cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext,
-  compactionType: CompactionType)
+case class CompactionCallableModel(storePath: String,
+    carbonLoadModel: CarbonLoadModel,
+    partitioner: Partitioner,
+    storeLocation: String,
+    carbonTable: CarbonTable,
+    kettleHomePath: String,
+    cubeCreationTime: Long,
+    loadsToMerge: util.List[LoadMetadataDetails],
+    sqlContext: SQLContext,
+    compactionType: CompactionType)
 
 object TableNewProcessor {
   def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
@@ -189,6 +197,7 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
 
   var index = 0
   var rowGroup = 0
+
   def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
     fieldChildren.foreach(fields => {
@@ -294,12 +303,12 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
     // Its based on the dimension name and measure name
     allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate column found with name : $name")
+      LOGGER.error(s"Duplicate column found with name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName}" +
-        s"Duplicate column found with name : $name")
-      sys.error(s"Duplicate dimensions found with name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName }" +
+        s"Duplicate column found with name: $name")
+      sys.error(s"Duplicate dimensions found with name: $name")
     })
 
     val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
@@ -314,14 +323,11 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
     for (column <- allColumns) {
       if (highCardinalityDims.contains(column.getColumnName)) {
         newOrderedDims += column
-      }
-      else if (column.isComplex) {
+      } else if (column.isComplex) {
         complexDims += column
-      }
-      else if (column.isDimensionColumn) {
+      } else if (column.isDimensionColumn) {
         newOrderedDims += column
-      }
-      else {
+      } else {
         measures += column
       }
 
@@ -333,7 +339,7 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
       // When the column is measure or the specified no inverted index column in DDL,
       // set useInvertedIndex to false, otherwise true.
       if (noInvertedIndexCols.contains(column.getColumnName) ||
-        cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
+          cm.msrCols.exists(_.column.equalsIgnoreCase(column.getColumnName))) {
         column.setUseInvertedIndex(false)
       } else {
         column.setUseInvertedIndex(true)
@@ -378,25 +384,22 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
             Partitioner(
               "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
               Array(""), part.partitionCount, null)
-          }
-          else {
+          } else {
             // case where partition cols are set and partition class is not set.
             // so setting the default value.
             Partitioner(
               "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
               part.partitionColumn, part.partitionCount, null)
           }
-        }
-        else if (definedpartCols.nonEmpty) {
+        } else if (definedpartCols.nonEmpty) {
           val msg = definedpartCols.mkString(", ")
-          LOGGER.error(s"partition columns specified are not part of Dimension columns : $msg")
+          LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
           LOGGER.audit(
             s"Validation failed for Create/Alter Table Operation for " +
-              s"${cm.databaseName}.${cm.tableName} " +
-            s"partition columns specified are not part of Dimension columns : $msg")
-          sys.error(s"partition columns specified are not part of Dimension columns : $msg")
-        }
-        else {
+            s"${ cm.databaseName }.${ cm.tableName } " +
+            s"partition columns specified are not part of Dimension columns: $msg")
+          sys.error(s"partition columns specified are not part of Dimension columns: $msg")
+        } else {
 
           try {
             Class.forName(part.partitionClass).newInstance()
@@ -405,9 +408,9 @@ class TableNewProcessor(cm: tableModel, sqlContext: SQLContext) {
               val cl = part.partitionClass
               LOGGER.audit(
                 s"Validation failed for Create/Alter Table Operation for " +
-                  s"${cm.databaseName}.${cm.tableName} " +
-                s"partition class specified can not be found or loaded : $cl")
-              sys.error(s"partition class specified can not be found or loaded : $cl")
+                s"${ cm.databaseName }.${ cm.tableName } " +
+                s"partition class specified can not be found or loaded: $cl")
+              sys.error(s"partition class specified can not be found or loaded: $cl")
           }
 
           Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
@@ -578,42 +581,42 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
     // Its based on the dimension name and measure name
     levels.groupBy(_.name).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate dimensions found with name : $name")
+      LOGGER.error(s"Duplicate dimensions found with name: $name")
       LOGGER.audit(
-        s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate dimensions found with name : $name")
-      sys.error(s"Duplicate dimensions found with name : $name")
+        "Validation failed for Create/Alter Table Operation " +
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate dimensions found with name: $name")
+      sys.error(s"Duplicate dimensions found with name: $name")
     })
 
     levels.groupBy(_.column).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate dimensions found with column name : $name")
+      LOGGER.error(s"Duplicate dimensions found with column name: $name")
       LOGGER.audit(
-        s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate dimensions found with column name : $name")
-      sys.error(s"Duplicate dimensions found with column name : $name")
+        "Validation failed for Create/Alter Table Operation " +
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate dimensions found with column name: $name")
+      sys.error(s"Duplicate dimensions found with column name: $name")
     })
 
     measures.groupBy(_.name).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate measures found with name : $name")
+      LOGGER.error(s"Duplicate measures found with name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate measures found with name : $name")
-      sys.error(s"Duplicate measures found with name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate measures found with name: $name")
+      sys.error(s"Duplicate measures found with name: $name")
     })
 
     measures.groupBy(_.column).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Duplicate measures found with column name : $name")
+      LOGGER.error(s"Duplicate measures found with column name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Duplicate measures found with column name : $name")
-      sys.error(s"Duplicate measures found with column name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Duplicate measures found with column name: $name")
+      sys.error(s"Duplicate measures found with column name: $name")
     })
 
     val levelsArray = levels.map(_.name)
@@ -625,7 +628,7 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
         LOGGER.error(s"Aggregator should not be defined for dimension fields [$fault]")
         LOGGER.audit(
           s"Validation failed for Create/Alter Table Operation for " +
-            s"${cm.databaseName}.${cm.tableName} " +
+          s"${ cm.databaseName }.${ cm.tableName } " +
           s"Aggregator should not be defined for dimension fields [$fault]")
         sys.error(s"Aggregator should not be defined for dimension fields [$fault]")
       }
@@ -633,12 +636,12 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
 
     levelsNdMesures.groupBy(x => x).foreach(f => if (f._2.size > 1) {
       val name = f._1
-      LOGGER.error(s"Dimension and Measure defined with same name : $name")
+      LOGGER.error(s"Dimension and Measure defined with same name: $name")
       LOGGER.audit(
         s"Validation failed for Create/Alter Table Operation " +
-        s"for ${cm.databaseName}.${cm.tableName} " +
-        s"Dimension and Measure defined with same name : $name")
-      sys.error(s"Dimension and Measure defined with same name : $name")
+        s"for ${ cm.databaseName }.${ cm.tableName } " +
+        s"Dimension and Measure defined with same name: $name")
+      sys.error(s"Dimension and Measure defined with same name: $name")
     })
 
     dimSrcDimensions.foreach(d => {
@@ -677,8 +680,7 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
           val matchedMapping = aggs.filter(agg => f.name.equals(agg.msrName))
           if (matchedMapping.isEmpty) {
             f
-          }
-          else {
+          } else {
             Measure(f.name, f.column, f.dataType, matchedMapping.head.aggType)
           }
         }
@@ -708,17 +710,14 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
           Partitioner(
             "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl",
             Array(""), part.partitionCount, null)
-        }
-        else if (definedpartCols.nonEmpty) {
+        } else if (definedpartCols.nonEmpty) {
           val msg = definedpartCols.mkString(", ")
-          LOGGER.error(s"partition columns specified are not part of Dimension columns : $msg")
+          LOGGER.error(s"partition columns specified are not part of Dimension columns: $msg")
           LOGGER.audit(
             s"Validation failed for Create/Alter Table Operation - " +
-            s"partition columns specified are not part of Dimension columns : $msg")
-          sys.error(s"partition columns specified are not part of Dimension columns : $msg")
-        }
-        else {
-
+            s"partition columns specified are not part of Dimension columns: $msg")
+          sys.error(s"partition columns specified are not part of Dimension columns: $msg")
+        } else {
           try {
             Class.forName(part.partitionClass).newInstance()
           } catch {
@@ -726,9 +725,9 @@ class TableProcessor(cm: tableModel, sqlContext: SQLContext) {
               val cl = part.partitionClass
               LOGGER.audit(
                 s"Validation failed for Create/Alter Table Operation for " +
-                  s"${cm.databaseName}.${cm.tableName} " +
-                s"partition class specified can not be found or loaded : $cl")
-              sys.error(s"partition class specified can not be found or loaded : $cl")
+                s"${ cm.databaseName }.${ cm.tableName } " +
+                s"partition class specified can not be found or loaded: $cl")
+              sys.error(s"partition class specified can not be found or loaded: $cl")
           }
 
           Partitioner(part.partitionClass, columnBuffer.toArray, part.partitionCount, null)
@@ -782,8 +781,8 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
     val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
     if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(databaseName + "_" + tableName)) {
-      logError("alter table failed. table not found: " + databaseName + "." + tableName)
-      sys.error("alter table failed. table not found: " + databaseName + "." + tableName)
+      logError(s"alter table failed. table not found: $databaseName.$tableName")
+      sys.error(s"alter table failed. table not found: $databaseName.$tableName")
     }
 
     val relation =
@@ -824,17 +823,14 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e
           kettleHomePath,
           storeLocation
         )
-    }
-    catch {
+    } catch {
       case e: Exception =>
         if (null != e.getMessage) {
-          sys.error("Compaction failed. Please check logs for more info." + e.getMessage)
-        }
-        else {
+          sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
+        } else {
           sys.error("Exception in compaction. Please check logs for more info.")
         }
     }
-
     Seq.empty
   }
 }
@@ -861,9 +857,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
           s"Table [$tbName] already exists under database [$dbName]")
         sys.error(s"Table [$tbName] already exists under database [$dbName]")
       }
-    }
-    else {
-
+    } else {
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
       // Need to fill partitioner class when we support partition
@@ -872,7 +866,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
         sqlContext.sql(
           s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
           s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
-              .collect
+          .collect
       } catch {
         case e: Exception =>
           val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
@@ -882,7 +876,7 @@ case class CreateTable(cm: tableModel) extends RunnableCommand {
             .dropTable(catalog.storePath, identifier)(sqlContext)
 
           LOGGER.audit(s"Table creation with Database name [$dbName] " +
-            s"and Table name [$tbName] failed")
+                       s"and Table name [$tbName] failed")
           throw e
       }
 
@@ -940,8 +934,8 @@ private[sql] case class DeleteLoadsById(
         LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.")
       }
       else {
-        sys.error("Delete segment by Id is failed. Invalid ID is :"
-                  + invalidLoadIds.mkString(","))
+        sys.error("Delete segment by Id is failed. Invalid ID is:" +
+                  s" ${ invalidLoadIds.mkString(",") }")
       }
     } catch {
       case ex: Exception =>
@@ -963,10 +957,10 @@ private[sql] case class DeleteLoadsById(
 }
 
 private[sql] case class DeleteLoadsByLoadDate(
-   databaseNameOp: Option[String],
-  tableName: String,
-  dateField: String,
-  loadDate: String) extends RunnableCommand {
+    databaseNameOp: Option[String],
+    tableName: String,
+    dateField: String,
+    loadDate: String) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
 
@@ -980,12 +974,12 @@ private[sql] case class DeleteLoadsByLoadDate(
     if (relation == null) {
       LOGGER
         .audit(s"Delete segment by load date is failed. Table $dbName.$tableName does not " +
-         s"exist")
+               s"exist")
       sys.error(s"Table $dbName.$tableName does not exist")
     }
 
     val timeObj = Cast(Literal(loadDate), TimestampType).eval()
-    if(null == timeObj) {
+    if (null == timeObj) {
       val errorMessage = "Error: Invalid load start time format " + loadDate
       throw new MalformedCarbonCommandException(errorMessage)
     }
@@ -1037,20 +1031,20 @@ case class LoadTable(
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     val identifier = TableIdentifier(tableName, Option(dbName))
     if (isOverwriteExist) {
-      sys.error("Overwrite is not supported for carbon table with " + dbName + "." + tableName)
+      sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
     if (null == org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(dbName + "_" + tableName)) {
-      logError("Data loading failed. table not found: " + dbName + "." + tableName)
-      LOGGER.audit("Data loading failed. table not found: " + dbName + "." + tableName)
-      sys.error("Data loading failed. table not found: " + dbName + "." + tableName)
+      logError(s"Data loading failed. table not found: $dbName.$tableName")
+      LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+      sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
     val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
-        .lookupRelation1(Option(dbName), tableName)(sqlContext)
-        .asInstanceOf[CarbonRelation]
+      .lookupRelation1(Option(dbName), tableName)(sqlContext)
+      .asInstanceOf[CarbonRelation]
     if (relation == null) {
-        sys.error(s"Table $dbName.$tableName does not exist")
+      sys.error(s"Table $dbName.$tableName does not exist")
     }
     CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
@@ -1066,8 +1060,13 @@ case class LoadTable(
         sys.error("Table is locked for updation. Please try after some time")
       }
 
-      val factPath = if (dataFrame.isDefined) "" else FileUtils.getPaths(
-        CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+      val factPath = if (dataFrame.isDefined) {
+        ""
+      }
+      else {
+        FileUtils.getPaths(
+          CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
+      }
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
       carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
@@ -1127,7 +1126,8 @@ case class LoadTable(
         case "false" => false
         case illegal =>
           val errorMessage = "Illegal syntax found: [" + illegal + "] .The value multiline in " +
-            "load DDL which you set can only be 'true' or 'false', please check your input DDL."
+                             "load DDL which you set can only be 'true' or 'false', please check " +
+                             "your input DDL."
           throw new MalformedCarbonCommandException(errorMessage)
       }
       val maxColumns = options.getOrElse("maxcolumns", null)
@@ -1165,8 +1165,7 @@ case class LoadTable(
         // First system has to partition the data first and then call the load data
         if (null == relation.tableMeta.partitioner.partitionColumn ||
             relation.tableMeta.partitioner.partitionColumn(0).isEmpty) {
-          LOGGER.info("Initiating Direct Load for the Table : (" +
-                      dbName + "." + tableName + ")")
+          LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
           carbonLoadModel.setFactFilePath(factPath)
           carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
           carbonLoadModel.setCsvHeader(fileHeader)
@@ -1185,14 +1184,12 @@ case class LoadTable(
             partitionStatus,
             useKettle,
             dataFrame)
-      }
-      catch {
+      } catch {
         case ex: Exception =>
           LOGGER.error(ex)
           LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex
-      }
-      finally {
+      } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
           val fileType = FileFactory.getFileType(partitionLocation)
@@ -1205,7 +1202,7 @@ case class LoadTable(
           case ex: Exception =>
             LOGGER.error(ex)
             LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
-              "Problem deleting the partition folder")
+                         "Problem deleting the partition folder")
             throw ex
         }
 
@@ -1229,12 +1226,12 @@ case class LoadTable(
     Seq.empty
   }
 
-  private  def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
+  private def validateDateFormat(dateFormat: String, table: CarbonTable): Unit = {
     val dimensions = table.getDimensionByTableName(tableName).asScala
     if (dateFormat != null) {
       if (dateFormat.trim == "") {
         throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
-          "string.")
+                                                  "string.")
       } else {
         var dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
         for (singleDateFormat <- dateFormats) {
@@ -1242,11 +1239,13 @@ case class LoadTable(
           val columnName = dateFormatSplits(0).trim.toLowerCase
           if (!dimensions.exists(_.getColName.equals(columnName))) {
             throw new MalformedCarbonCommandException("Error: Wrong Column Name " +
-              dateFormatSplits(0) + " is provided in Option DateFormat.")
+                                                      dateFormatSplits(0) +
+                                                      " is provided in Option DateFormat.")
           }
           if (dateFormatSplits.length < 2 || dateFormatSplits(1).trim.isEmpty) {
             throw new MalformedCarbonCommandException("Error: Option DateFormat is not provided " +
-              "for " + "Column " + dateFormatSplits(0) + ".")
+                                                      "for " + "Column " + dateFormatSplits(0) +
+                                                      ".")
           }
         }
       }
@@ -1279,8 +1278,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       CarbonEnv.getInstance(sqlContext).carbonCatalog.dropTable(storePath, identifier)(sqlContext)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
-    }
-    finally {
+    } finally {
       if (carbonLock != null && isLocked) {
         if (carbonLock.unlock()) {
           logInfo("Table MetaData Unlocked Successfully after dropping the table")
@@ -1293,7 +1291,7 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
             CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
           // delete bad record log after drop table
-          val badLogPath = CarbonUtil.getBadLogPath(dbName +  File.separator + tableName)
+          val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
           val badLogFileType = FileFactory.getFileType(badLogPath)
           if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
             val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
@@ -1353,8 +1351,7 @@ private[sql] case class ShowLoads(
         try {
           val lim = Integer.parseInt(limitLoads)
           loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
-        }
-        catch {
+        } catch {
           case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
         }
 
@@ -1389,13 +1386,13 @@ private[sql] case class DescribeCommandFormatted(
     var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
       val comment = if (relation.metaData.dims.contains(field.name)) {
         val dimension = relation.metaData.carbonTable.getDimensionByName(
-            relation.tableMeta.carbonTableIdentifier.getTableName,
-            field.name)
+          relation.tableMeta.carbonTableIdentifier.getTableName,
+          field.name)
         if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
           val colprop = mapper.writeValueAsString(dimension.getColumnProperties)
           colProps.append(field.name).append(".")
-          .append(mapper.writeValueAsString(dimension.getColumnProperties))
-          .append(",")
+            .append(mapper.writeValueAsString(dimension.getColumnProperties))
+            .append(",")
         }
         if (dimension.hasEncoding(Encoding.DICTIONARY) &&
             !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
@@ -1415,11 +1412,11 @@ private[sql] case class DescribeCommandFormatted(
       colProps.toString()
     }
     results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
-    results ++= Seq(("Database Name : ", relation.tableMeta.carbonTableIdentifier
+    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
       .getDatabaseName, "")
     )
-    results ++= Seq(("Table Name : ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
-    results ++= Seq(("CARBON Store Path : ", relation.tableMeta.storePath, ""))
+    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
+    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
     val carbonTable = relation.tableMeta.carbonTable
     results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
     results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
@@ -1438,7 +1435,7 @@ private[sql] case class DescribeCommandFormatted(
 
   private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
     var results: Seq[(String, String, String)] =
-        Seq(("", "", ""), ("##Column Group Information", "", ""))
+      Seq(("", "", ""), ("##Column Group Information", "", ""))
     val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
       case (groupId, _) => groupId != -1
     }.toSeq.sortBy(_._1)
@@ -1447,7 +1444,7 @@ private[sql] case class DescribeCommandFormatted(
     })
     var index = 1
     groups.map { x =>
-      results = results:+(s"Column Group $index", x, "")
+      results = results :+ (s"Column Group $index", x, "")
       index = index + 1
     }
     results
@@ -1464,7 +1461,6 @@ private[sql] case class DeleteLoadByDate(
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def run(sqlContext: SQLContext): Seq[Row] = {
-
     val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
     LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
     val identifier = TableIdentifier(tableName, Option(dbName))
@@ -1472,26 +1468,21 @@ private[sql] case class DeleteLoadByDate(
       .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
     var level: String = ""
     val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata
-         .getInstance().getCarbonTable(dbName + '_' + tableName)
+      .getInstance().getCarbonTable(dbName + '_' + tableName)
     if (relation == null) {
       LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
       sys.error(s"Table $dbName.$tableName does not exist")
     }
-
     val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
       filter => filter.name.equalsIgnoreCase(dateField) &&
                 filter.dataType.isInstanceOf[TimestampType]).toList
-
     if (matches.isEmpty) {
-      LOGGER.audit(
-        "The delete load by date is failed. " +
-        "Table $dbName.$tableName does not contain date field :" + dateField)
-      sys.error(s"Table $dbName.$tableName does not contain date field " + dateField)
-    }
-    else {
+      LOGGER.audit("The delete load by date is failed. " +
+                   s"Table $dbName.$tableName does not contain date field: $dateField")
+      sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
+    } else {
       level = matches.asJava.get(0).name
     }
-
     val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
       .getColName
     CarbonDataRDDFactory.deleteLoadByDate(
@@ -1507,6 +1498,7 @@ private[sql] case class DeleteLoadByDate(
     LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
     Seq.empty
   }
+
 }
 
 private[sql] case class CleanFiles(
@@ -1544,7 +1536,7 @@ private[sql] case class CleanFiles(
         relation.tableMeta.partitioner)
       LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.")
     } catch {
-      case ex : Exception =>
+      case ex: Exception =>
         sys.error(ex.getMessage)
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index d551e10..3fe62cc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -55,9 +55,9 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
 case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
 
 case class CarbonMetaData(dims: Seq[String],
-  msrs: Seq[String],
-  carbonTable: CarbonTable,
-  dictionaryMap: DictionaryMap)
+    msrs: Seq[String],
+    carbonTable: CarbonTable,
+    dictionaryMap: DictionaryMap)
 
 case class TableMeta(carbonTableIdentifier: CarbonTableIdentifier, storePath: String,
     var carbonTable: CarbonTable, partitioner: Partitioner)
@@ -176,12 +176,12 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
       ZookeeperInit.getInstance(zookeeperUrl)
       LOGGER.info("Zookeeper url is configured. Taking the zookeeper as lock type.")
       var configuredLockType = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE)
+        .getProperty(CarbonCommonConstants.LOCK_TYPE)
       if (null == configuredLockType) {
         configuredLockType = CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER
         CarbonProperties.getInstance
-            .addProperty(CarbonCommonConstants.LOCK_TYPE,
-                configuredLockType)
+          .addProperty(CarbonCommonConstants.LOCK_TYPE,
+            configuredLockType)
       }
     }
 
@@ -214,7 +214,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
             tableFolders.foreach(tableFolder => {
               if (tableFolder.isDirectory) {
                 val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName,
-                    tableFolder.getName, UUID.randomUUID().toString)
+                  tableFolder.getName, UUID.randomUUID().toString)
                 val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath,
                   carbonTableIdentifier)
                 val tableMetadataFile = carbonTablePath.getSchemaFilePath
@@ -260,22 +260,17 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
             })
           }
         })
-      }
-      else {
+      } else {
         // Create folders and files.
         FileFactory.mkdirs(databasePath, fileType)
-
       }
-    }
-    catch {
+    } catch {
       case s: java.io.FileNotFoundException =>
         // Create folders and files.
         FileFactory.mkdirs(databasePath, fileType)
-
     }
   }
 
-
   /**
    *
    * Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
@@ -286,11 +281,9 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
       tableInfo: org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo,
       dbName: String, tableName: String, partitioner: Partitioner)
     (sqlContext: SQLContext): String = {
-
     if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
       sys.error(s"Table [$tableName] already exists under Database [$dbName]")
     }
-
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val thriftTableInfo = schemaConverter
       .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
@@ -299,14 +292,13 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
       .add(schemaEvolutionEntry)
 
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName,
-        tableInfo.getFactTable.getTableId)
+      tableInfo.getFactTable.getTableId)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     tableInfo.setMetaDataFilepath(schemaMetadataPath)
     tableInfo.setStorePath(storePath)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
-
     val tableMeta = TableMeta(
       carbonTableIdentifier,
       storePath,
@@ -318,15 +310,13 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
     }
-
     val thriftWriter = new ThriftWriter(schemaFilePath, false)
     thriftWriter.open()
     thriftWriter.write(thriftTableInfo)
     thriftWriter.close()
-
     metadata.tablesMeta += tableMeta
     logInfo(s"Table $tableName for Database $dbName created successfully.")
-    LOGGER.info("Table " + tableName + " for Database " + dbName + " created successfully.")
+    LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
     updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
     carbonTablePath.getPath
   }
@@ -392,8 +382,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
           if (c.carbonTableIdentifier.getDatabaseName.contains(name)) {
             c.carbonTableIdentifier
               .getDatabaseName
-          }
-          else {
+          } else {
             null
           }
         case _ => c.carbonTableIdentifier.getDatabaseName
@@ -420,8 +409,8 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
   def getAllTables()(sqlContext: SQLContext): Seq[TableIdentifier] = {
     checkSchemasModifiedTimeAndReloadTables()
     metadata.tablesMeta.map { c =>
-        TableIdentifier(c.carbonTableIdentifier.getTableName,
-          Some(c.carbonTableIdentifier.getDatabaseName))
+      TableIdentifier(c.carbonTableIdentifier.getTableName,
+        Some(c.carbonTableIdentifier.getDatabaseName))
     }
   }
 
@@ -526,7 +515,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
-        tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+            tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
         refreshCache()
       }
     }
@@ -636,18 +625,18 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath: String,
 object CarbonMetastoreTypes extends RegexParsers {
   protected lazy val primitiveType: Parser[DataType] =
     "string" ^^^ StringType |
-      "float" ^^^ FloatType |
-      "int" ^^^ IntegerType |
-      "tinyint" ^^^ ShortType |
-      "short" ^^^ ShortType |
-      "double" ^^^ DoubleType |
-      "long" ^^^ LongType |
-      "binary" ^^^ BinaryType |
-      "boolean" ^^^ BooleanType |
-      fixedDecimalType |
-      "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
-      "varchar\\((\\d+)\\)".r ^^^ StringType |
-      "timestamp" ^^^ TimestampType
+    "float" ^^^ FloatType |
+    "int" ^^^ IntegerType |
+    "tinyint" ^^^ ShortType |
+    "short" ^^^ ShortType |
+    "double" ^^^ DoubleType |
+    "long" ^^^ LongType |
+    "binary" ^^^ BinaryType |
+    "boolean" ^^^ BooleanType |
+    fixedDecimalType |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) |
+    "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "timestamp" ^^^ TimestampType
 
   protected lazy val fixedDecimalType: Parser[DataType] =
     "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 81abbfb..0c13293 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
 object DistributionUtil {
   @transient
   val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
+
   /*
    * This method will return the list of executers in the cluster.
    * For this we take the  memory status of all node with getExecutorMemoryStatus
@@ -62,13 +63,11 @@ object DistributionUtil {
           addr.getHostName
         }
         nodeNames.toArray
-      }
-      else {
+      } else {
         // For Standalone cluster, node IPs will be returned.
         nodelist.toArray
       }
-    }
-    else {
+    } else {
       Seq(InetAddress.getLocalHost.getHostName).toArray
     }
   }
@@ -111,37 +110,41 @@ object DistributionUtil {
    * @return
    */
   def ensureExecutorsAndGetNodeList(blockList: Array[Distributable],
-    sparkContext: SparkContext):
+      sparkContext: SparkContext):
   Array[String] = {
     val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
     var confExecutorsTemp: String = null
     if (sparkContext.getConf.contains("spark.executor.instances")) {
       confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
     } else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-      && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
-      .equalsIgnoreCase("true")) {
+               && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+                 .equalsIgnoreCase("true")) {
       if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
         confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
       }
     }
 
-    val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
+    val confExecutors = if (null != confExecutorsTemp) {
+      confExecutorsTemp.toInt
+    } else {
+      1
+    }
     val requiredExecutors = if (nodeMapping.size > confExecutors) {
       confExecutors
-    } else {nodeMapping.size()}
+    } else { nodeMapping.size() }
 
-    val startTime = System.currentTimeMillis();
+    val startTime = System.currentTimeMillis()
     CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
     var nodes = DistributionUtil.getNodeList(sparkContext)
-    var maxTimes = 30;
+    var maxTimes = 30
     while (nodes.length < requiredExecutors && maxTimes > 0) {
-      Thread.sleep(500);
+      Thread.sleep(500)
       nodes = DistributionUtil.getNodeList(sparkContext)
-      maxTimes = maxTimes - 1;
+      maxTimes = maxTimes - 1
     }
-    val timDiff = System.currentTimeMillis() - startTime;
-    LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
-    LOGGER.info("Time elapsed to allocate the required executors : " + (30 - maxTimes) * 500)
+    val timDiff = System.currentTimeMillis() - startTime
+    LOGGER.info(s"Total Time taken to ensure the required executors: $timDiff")
+    LOGGER.info(s"Time elapsed to allocate the required executors: ${ (30 - maxTimes) * 500 }")
     nodes.distinct
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 368a1ad..d60bed4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -75,7 +75,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
   def apply(plan: LogicalPlan): LogicalPlan = {
     if (relations.nonEmpty && !isOptimized(plan)) {
       LOGGER.info("Starting to optimize plan")
-      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("");
+      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
       val queryStatistic = new QueryStatistic()
       val result = transformCarbonPlan(plan, relations)
       queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
@@ -99,8 +99,8 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
   case class ExtraNodeInfo(var hasCarbonRelation: Boolean)
 
   def fillNodeInfo(
-       plan: LogicalPlan,
-       extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
+      plan: LogicalPlan,
+      extraNodeInfos: java.util.HashMap[LogicalPlan, ExtraNodeInfo]): ExtraNodeInfo = {
     plan match {
       case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
         val extraNodeInfo = ExtraNodeInfo(true)
@@ -465,9 +465,10 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           decoder = true
           cd
         case currentPlan =>
-          hasCarbonRelation(currentPlan) match {
-            case true => addTempDecoder(currentPlan)
-            case false => currentPlan
+          if (hasCarbonRelation(currentPlan)) {
+            addTempDecoder(currentPlan)
+          } else {
+            currentPlan
           }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6391c2be/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
index b683629..e755b2e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -29,19 +29,18 @@ object FileUtils extends Logging {
    * append all csv file path to a String, file path separated by comma
    */
   private def getPathsFromCarbonFile(carbonFile: CarbonFile, stringBuild: StringBuilder): Unit = {
-    carbonFile.isDirectory match {
-    case true =>
+    if (carbonFile.isDirectory) {
       val files = carbonFile.listFiles()
       for (j <- 0 until files.size) {
         getPathsFromCarbonFile(files(j), stringBuild)
       }
-    case false =>
+    } else {
       val path = carbonFile.getAbsolutePath
       val fileName = carbonFile.getName
       if (carbonFile.getSize == 0) {
         logWarning(s"skip empty input file: $path")
       } else if (fileName.startsWith(CarbonCommonConstants.UNDERSCORE) ||
-          fileName.startsWith(CarbonCommonConstants.POINT)) {
+                 fileName.startsWith(CarbonCommonConstants.POINT)) {
         logWarning(s"skip invisible input file: $path")
       } else {
         stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
@@ -71,7 +70,7 @@ object FileUtils extends Logging {
         stringBuild.substring(0, stringBuild.size - 1)
       } else {
         throw new DataLoadingException("Please check your input path and make sure " +
-          "that files end with '.csv' and content is not empty.")
+                                       "that files end with '.csv' and content is not empty.")
       }
     }
   }
@@ -90,4 +89,5 @@ object FileUtils extends Logging {
       size
     }
   }
+
 }


Mime
View raw message