carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [25/38] incubator-carbondata git commit: reuse test case for integration module
Date Sat, 07 Jan 2017 16:36:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
new file mode 100644
index 0000000..0deb14e
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/*
+ * Test Class for query without data load
+ *
+ */
+class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+	  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1");
+    sql("drop table if exists OldFormatTable")
+    sql("drop table if exists OldFormatTableHIVE")
+     sql("""
+           CREATE TABLE IF NOT EXISTS OldFormatTable
+           (country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED BY 'carbondata'
+           """)
+      sql("""
+           CREATE TABLE IF NOT EXISTS OldFormatTableHIVE
+           (country String,
+           name String, phonetype String, serialname String, salary Int)
+          row format delimited fields terminated by ','
+           """)      
+    sql(s"LOAD DATA local inpath '$resourcesPath/OLDFORMATTABLE.csv' INTO table OldFormatTable")
+   sql(s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/OLDFORMATTABLEHIVE.csv' into table OldFormatTableHIVE
+           """)
+
+  }
+
+  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V2")
+  test("Test select * query") {
+    checkAnswer(
+      sql("select * from OldFormatTable"), sql("select * from OldFormatTableHIVE")
+    )
+  }
+
+  override def afterAll {
+     CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1")
+    sql("drop table if exists OldFormatTable")
+    sql("drop table if exists OldFormatTableHIVE")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithoutDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithoutDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithoutDataLoad.scala
new file mode 100644
index 0000000..4c95633
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithoutDataLoad.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/*
+ * Test Class for query without data load
+ *
+ */
+class TestQueryWithoutDataLoad extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    sql("drop table if exists no_load")
+    sql("""
+        CREATE TABLE no_load(imei string, age int, productdate timestamp, gamePointId double)
+        STORED BY 'org.apache.carbondata.format'
+        TBLPROPERTIES('DICTIONARY_EXCLUDE'='productdate', 'DICTIONARY_INCLUDE'='gamePointId')
+      """)
+  }
+
+  test("test query without data load") {
+    checkAnswer(
+      sql("select count(*) from no_load"), Seq(Row(0))
+    )
+    checkAnswer(
+      sql("select * from no_load"), Seq.empty
+    )
+    checkAnswer(
+      sql("select imei, count(age) from no_load group by imei"), Seq.empty
+    )
+    checkAnswer(
+      sql("select imei, sum(age) from no_load group by imei"), Seq.empty
+    )
+    checkAnswer(
+      sql("select imei, avg(age) from no_load group by imei"), Seq.empty
+    )
+  }
+
+  override def afterAll {
+    sql("drop table no_load")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestTableNameHasDbName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestTableNameHasDbName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestTableNameHasDbName.scala
new file mode 100644
index 0000000..7b43d46
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestTableNameHasDbName.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/*
+ * Test Class for query when part of tableName has dbName
+ *
+ */
+class TestTableNameHasDbName extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS tabledefault")
+    sql("CREATE TABLE tabledefault (empno int, workgroupcategory string, " +
+      "deptno int, projectcode int,attendance int)" +
+      " STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE tabledefault")
+  }
+
+  test("test query when part of tableName has dbName") {
+    try {
+      sql("SELECT * FROM tabledefault").collect()
+    } catch {
+      case ex: Exception =>
+        assert(false)
+    }
+  }
+
+  override def afterAll {
+    sql("DROP TABLE tabledefault")
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
new file mode 100644
index 0000000..4809a9d
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.badrecordloger
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll {
+    try {
+      sql("drop table IF EXISTS sales")
+      sql("drop table IF EXISTS serializable_values")
+      sql("drop table IF EXISTS serializable_values_false")
+      sql("drop table IF EXISTS insufficientColumn")
+      sql("drop table IF EXISTS insufficientColumn_false")
+      sql("drop table IF EXISTS emptyColumnValues")
+      sql("drop table IF EXISTS emptyColumnValues_false")
+      sql("drop table IF EXISTS empty_timestamp")
+      sql("drop table IF EXISTS empty_timestamp_false")
+      sql(
+        """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
+
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+      var csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
+          +
+          "('bad_records_logger_enable'='true','bad_records_action'='redirect', 'DELIMITER'=" +
+          " ',', 'QUOTECHAR'= '\"')");
+
+      // 1.0 "\N" which should be treated as NULL
+      // 1.1 Time stamp "\N" which should be treated as NULL
+      csvFilePath = s"$resourcesPath/badrecords/seriazableValue.csv"
+      sql(
+        """CREATE TABLE IF NOT EXISTS serializable_values(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable false
+      sql(
+        """CREATE TABLE IF NOT EXISTS serializable_values_false(ID BigInt, date Timestamp,
+           country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      sql(
+        "LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE serializable_values_false OPTIONS"
+        + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // 2. insufficient columns - Bad records/Null value based on configuration
+      sql(
+        """CREATE TABLE IF NOT EXISTS insufficientColumn(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/insufficientColumns.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable false
+      sql(
+        """CREATE TABLE IF NOT EXISTS insufficientColumn_false(ID BigInt, date Timestamp, country
+            String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE insufficientColumn_false OPTIONS"
+          + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+      // 3. empty data for string data type - take empty value
+      // 4. empty data for non-string data type - Bad records/Null value based on configuration
+      //table should have only two records.
+      sql(
+        """CREATE TABLE IF NOT EXISTS emptyColumnValues(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable to false
+      sql(
+        """CREATE TABLE IF NOT EXISTS emptyColumnValues_false(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues_false OPTIONS"
+          + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+      // 4.1 Time stamp empty data - Bad records/Null value based on configuration
+      // 5. non-parsable data - Bad records/Null value based on configuration
+      // 6. empty line(check current one) - Bad records/Null value based on configuration
+      // only one value should be loadded.
+      sql(
+        """CREATE TABLE IF NOT EXISTS empty_timestamp(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+          +
+          "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable to false
+      sql(
+        """CREATE TABLE IF NOT EXISTS empty_timestamp_false(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp_false OPTIONS"
+          + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+    } catch {
+      case x: Throwable => CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    }
+  }
+
+  test("select count(*) from sales") {
+    sql("select count(*) from sales").show()
+    checkAnswer(
+      sql("select count(*) from sales"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from serializable_values") {
+    sql("select count(*) from serializable_values").show()
+    checkAnswer(
+      sql("select count(*) from serializable_values"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from serializable_values_false") {
+    sql("select count(*) from serializable_values_false").show()
+    checkAnswer(
+      sql("select count(*) from serializable_values_false"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from empty_timestamp") {
+    sql("select count(*) from empty_timestamp").show()
+    checkAnswer(
+      sql("select count(*) from empty_timestamp"),
+      Seq(Row(1)
+      )
+    )
+  }
+
+  test("select count(*) from insufficientColumn") {
+    sql("select count(*) from insufficientColumn").show()
+    checkAnswer(
+      sql("select count(*) from insufficientColumn"),
+      Seq(Row(1)
+      )
+    )
+  }
+
+  test("select count(*) from insufficientColumn_false") {
+    sql("select count(*) from insufficientColumn_false").show()
+    checkAnswer(
+      sql("select count(*) from insufficientColumn_false"),
+      Seq(Row(3)
+      )
+    )
+  }
+
+
+  test("select count(*) from emptyColumnValues") {
+    sql("select count(*) from emptyColumnValues").show()
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from emptyColumnValues_false") {
+    sql("select count(*) from emptyColumnValues_false").show()
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+  test("select count(*) from empty_timestamp_false") {
+    sql("select count(*) from empty_timestamp_false").show()
+    checkAnswer(
+      sql("select count(*) from empty_timestamp_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+
+  override def afterAll {
+    sql("drop table sales")
+    sql("drop table serializable_values")
+    sql("drop table serializable_values_false")
+    sql("drop table insufficientColumn")
+    sql("drop table insufficientColumn_false")
+    sql("drop table emptyColumnValues")
+    sql("drop table emptyColumnValues_false")
+    sql("drop table empty_timestamp")
+    sql("drop table empty_timestamp_false")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestAvgForBigInt.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestAvgForBigInt.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestAvgForBigInt.scala
new file mode 100644
index 0000000..361ff9b
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestAvgForBigInt.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.bigdecimal
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestAvgForBigInt extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists carbonTable")
+    val csvFilePath = s"$resourcesPath/bigIntData.csv"
+
+    sql(
+      """
+      CREATE TABLE IF NOT EXISTS carbonTable (ID Int, date Timestamp, country String,
+      name String, phonetype String, serialname String, salary bigint)
+      STORED BY 'org.apache.carbondata.format'
+      """
+    )
+
+    sql(
+      "LOAD DATA LOCAL INPATH '" + csvFilePath + "' into table carbonTable"
+    )
+  }
+
+  test("test avg function on big int column") {
+    checkAnswer(
+      sql("select avg(salary) from carbonTable"),
+      sql("select sum(salary)/count(salary) from carbonTable")
+    )
+  }
+
+  override def afterAll {
+    sql("drop table if exists carbonTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
new file mode 100644
index 0000000..c554cbe
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.bigdecimal
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+  * Test cases for testing big decimal functionality
+  */
+class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists carbonTable")
+    sql("drop table if exists hiveTable")
+    sql("drop table if exists hiveBigDecimal")
+    sql("drop table if exists carbonBigDecimal_2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE, "1")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT, "2")
+    sql("CREATE TABLE IF NOT EXISTS carbonTable (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(17,2))STORED BY 'org.apache.carbondata.format'")
+    sql("create table if not exists hiveTable(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(17,2))row format delimited fields terminated by ','")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalDataWithHeader.csv' into table carbonTable")
+    sql(s"LOAD DATA local inpath '$resourcesPath/decimalDataWithoutHeader.csv' INTO table hiveTable")
+    sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10))row format delimited fields terminated by ','")
+    sql(s"LOAD DATA local inpath '$resourcesPath/decimalBoundaryDataHive.csv' INTO table hiveBigDecimal")
+    sql("create table if not exists carbonBigDecimal_2 (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal_2")
+  }
+
+  test("test detail query on big decimal column") {
+    checkAnswer(sql("select salary from carbonTable order by salary"),
+      sql("select salary from hiveTable order by salary"))
+  }
+
+  test("test sum function on big decimal column") {
+    checkAnswer(sql("select sum(salary) from carbonTable"),
+      sql("select sum(salary) from hiveTable"))
+  }
+
+  test("test max function on big decimal column") {
+    checkAnswer(sql("select max(salary) from carbonTable"),
+      sql("select max(salary) from hiveTable"))
+  }
+
+  test("test min function on big decimal column") {
+    checkAnswer(sql("select min(salary) from carbonTable"),
+      sql("select min(salary) from hiveTable"))
+  }
+  
+  test("test min datatype on big decimal column") {
+    val output = sql("select min(salary) from carbonTable").collectAsList().get(0).get(0)
+    assert(output.isInstanceOf[java.math.BigDecimal])
+  }
+
+  test("test max datatype on big decimal column") {
+    val output = sql("select max(salary) from carbonTable").collectAsList().get(0).get(0)
+    assert(output.isInstanceOf[java.math.BigDecimal])
+  }
+  
+  test("test count function on big decimal column") {
+    checkAnswer(sql("select count(salary) from carbonTable"),
+      sql("select count(salary) from hiveTable"))
+  }
+
+  test("test distinct function on big decimal column") {
+    checkAnswer(sql("select distinct salary from carbonTable order by salary"),
+      sql("select distinct salary from hiveTable order by salary"))
+  }
+
+  test("test sum-distinct function on big decimal column") {
+    checkAnswer(sql("select sum(distinct salary) from carbonTable"),
+      sql("select sum(distinct salary) from hiveTable"))
+  }
+
+  test("test count-distinct function on big decimal column") {
+    checkAnswer(sql("select count(distinct salary) from carbonTable"),
+      sql("select count(distinct salary) from hiveTable"))
+  }
+  test("test filter query on big decimal column") {
+    // equal to
+    checkAnswer(sql("select salary from carbonTable where salary=45234525465882.24"),
+      sql("select salary from hiveTable where salary=45234525465882.24"))
+    // greater than
+    checkAnswer(sql("select salary from carbonTable where salary>15000"),
+      sql("select salary from hiveTable where salary>15000"))
+    // greater than equal to
+    checkAnswer(sql("select salary from carbonTable where salary>=15000.43525"),
+      sql("select salary from hiveTable where salary>=15000.43525"))
+    // less than
+    checkAnswer(sql("select salary from carbonTable where salary<45234525465882"),
+      sql("select salary from hiveTable where salary<45234525465882"))
+    // less than equal to
+    checkAnswer(sql("select salary from carbonTable where salary<=45234525465882.24"),
+      sql("select salary from hiveTable where salary<=45234525465882.24"))
+  }
+
+  test("test aggregation on big decimal column with increased precision") {
+    sql("drop table if exists carbonBigDecimal")
+    sql("create table if not exists carbonBigDecimal (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10)) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal")
+
+    checkAnswer(sql("select sum(salary) from carbonBigDecimal"),
+      sql("select sum(salary) from hiveBigDecimal"))
+
+    checkAnswer(sql("select sum(distinct salary) from carbonBigDecimal"),
+      sql("select sum(distinct salary) from hiveBigDecimal"))
+
+    sql("drop table if exists carbonBigDecimal")
+  }
+
+  test("test big decimal for dictionary look up") {
+    sql("drop table if exists decimalDictLookUp")
+    sql("create table if not exists decimalDictLookUp (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10)) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('dictionary_include'='salary')")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table decimalDictLookUp")
+
+    checkAnswer(sql("select sum(salary) from decimalDictLookUp"),
+      sql("select sum(salary) from hiveBigDecimal"))
+
+    sql("drop table if exists decimalDictLookUp")
+  }
+
+  test("test sum+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)+10 from carbonBigDecimal_2"),
+      sql("select sum(salary)+10 from hiveBigDecimal"))
+  }
+
+  test("test sum*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"),
+      sql("select sum(salary)*10 from hiveBigDecimal"))
+  }
+
+  test("test sum/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)/10 from carbonBigDecimal_2"),
+      sql("select sum(salary)/10 from hiveBigDecimal"))
+  }
+
+  test("test sum-distinct+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))+10 from carbonBigDecimal_2"),
+      sql("select sum(distinct(salary))+10 from hiveBigDecimal"))
+  }
+
+  test("test sum-distinct*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))*10 from carbonBigDecimal_2"),
+      sql("select sum(distinct(salary))*10 from hiveBigDecimal"))
+  }
+
+  test("test sum-distinct/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))/10 from carbonBigDecimal_2"),
+      sql("select sum(distinct(salary))/10 from hiveBigDecimal"))
+  }
+
+  test("test avg+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)+10 from carbonBigDecimal_2"),
+      sql("select avg(salary)+10 from hiveBigDecimal"))
+  }
+
+  test("test avg*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)*10 from carbonBigDecimal_2"),
+      sql("select avg(salary)*10 from hiveBigDecimal"))
+  }
+
+  test("test avg/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)/10 from carbonBigDecimal_2"),
+      sql("select avg(salary)/10 from hiveBigDecimal"))
+  }
+
+  override def afterAll {
+    sql("drop table if exists carbonTable")
+    sql("drop table if exists hiveTable")
+    sql("drop table if exists hiveBigDecimal")
+    sql("drop table if exists carbonBigDecimal_2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE,
+      CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+      CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestNullAndEmptyFields.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestNullAndEmptyFields.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestNullAndEmptyFields.scala
new file mode 100644
index 0000000..94e7f1f
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestNullAndEmptyFields.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.bigdecimal
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+  * Test cases for testing columns having null value
+  */
+class TestNullAndEmptyFields extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists carbonTable")
+    sql("drop table if exists hiveTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+      )
+    val csvFilePath = s"$resourcesPath/nullandnonparsableValue.csv"
+    sql(
+      "CREATE TABLE IF NOT EXISTS carbonTable (ID String, date Timestamp, country String, name " +
+        "String, phonetype String, serialname String, salary Decimal(17,2))STORED BY 'org.apache" +
+        ".carbondata.format'"
+    )
+    sql(
+      "create table if not exists hiveTable(ID String, date Timestamp, country String, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Decimal(17,2))row format delimited fields " +
+        "terminated by ','"
+    )
+    sql(
+      "LOAD DATA LOCAL INPATH '" + csvFilePath + "' into table carbonTable OPTIONS " +
+        "('FILEHEADER'='ID,date," +
+        "country,name,phonetype,serialname,salary')"
+    )
+    sql(
+      "LOAD DATA local inpath '" + csvFilePath + "' INTO table hiveTable"
+    )
+  }
+
+
+  test("test detail query on column having null values") {
+    checkAnswer(
+      sql("select * from carbonTable"),
+      sql("select * from hiveTable")
+    )
+  }
+
+  test("test filter query on column is null") {
+    checkAnswer(
+      sql("select * from carbonTable where salary is null"),
+      sql("select * from hiveTable where salary is null")
+    )
+  }
+
+  test("test filter query on column is not null") {
+    checkAnswer(
+      sql("select * from carbonTable where salary is not null"),
+      sql("select * from hiveTable where salary is not null")
+    )
+  }
+
+  test("test filter query on columnValue=null") {
+    checkAnswer(
+      sql("select * from carbonTable where salary=null"),
+      sql("select * from hiveTable where salary=null")
+    )
+  }
+
+  test("test filter query where date is null") {
+    checkAnswer(
+      sql("select * from carbonTable where date is null"),
+      sql("select * from hiveTable where date is null")
+    )
+  }
+
+  test("test  subquery on column having null values") {
+    checkAnswer(
+      sql("select * from (select if(country='china','c', country) test from carbonTable)qq where test is null"),
+      sql("select * from (select if(country='china','c', country) test from hiveTable)qq where test is null")
+    )
+  }
+
+  test("test  subquery on column having not null values") {
+    checkAnswer(
+      sql("select * from (select if(country='china','c', country) test from carbonTable)qq where test is not null"),
+      sql("select * from (select if(country='china','c', country) test from hiveTable)qq where test is not null")
+    )
+  }
+
+  override def afterAll {
+    sql("drop table if exists carbonTable")
+    sql("drop table if exists hiveTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala
new file mode 100644
index 0000000..3eccc70
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/BlockPruneQueryTestCase.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.blockprune
+
+import java.io.DataOutputStream
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+
+/**
+  * This class contains test cases for block prune query
+  */
+class BlockPruneQueryTestCase extends QueryTest with BeforeAndAfterAll {
+  val outputPath = s"$resourcesPath/block_prune_test.csv"
+  override def beforeAll {
+    // Since the data needed for block prune is big, need to create a temp data file
+    val testData: Array[String]= new Array[String](3)
+    testData(0) = "a"
+    testData(1) = "b"
+    testData(2) = "c"
+    var writer: DataOutputStream = null
+    try {
+      val fileType = FileFactory.getFileType(outputPath)
+      val file = FileFactory.getCarbonFile(outputPath, fileType)
+      if (!file.exists()) {
+        file.createNewFile()
+      }
+      writer = FileFactory.getDataOutputStream(outputPath, fileType)
+      for (i <- 0 to 2) {
+        for (j <- 0 to 240000) {
+          writer.writeBytes(testData(i) + "," + j + "\n")
+        }
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, "Build test file for block prune failed")
+    } finally {
+      if (writer != null) {
+        try {
+          writer.close()
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex, "Close output stream catching exception")
+        }
+      }
+    }
+
+    sql("DROP TABLE IF EXISTS blockprune")
+  }
+
+  test("test block prune query") {
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS blockprune (name string, id int)
+        STORED BY 'org.apache.carbondata.format'
+      """)
+    sql(
+        s"LOAD DATA LOCAL INPATH '$outputPath' INTO table blockprune options('FILEHEADER'='name,id')"
+      )
+    // data is in all 7 blocks
+    checkAnswer(
+      sql(
+        """
+          select name,count(name) as amount from blockprune
+          where name='c' or name='b' or name='a' group by name
+        """),
+      Seq(Row("a", 240001), Row("b", 240001), Row("c", 240001)))
+
+    // data only in middle 3/4/5 blocks
+    checkAnswer(
+      sql(
+        """
+          select name,count(name) as amount from blockprune
+          where name='b' group by name
+        """),
+      Seq(Row("b", 240001)))
+  }
+
+  override def afterAll {
+    // delete the temp data file
+    try {
+      val fileType = FileFactory.getFileType(outputPath)
+      val file = FileFactory.getCarbonFile(outputPath, fileType)
+      if (file.exists()) {
+        file.delete()
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, "Delete temp test data file for block prune catching exception")
+    }
+    sql("DROP TABLE IF EXISTS blockprune")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
new file mode 100644
index 0000000..8b8b3c5
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+  * FT for data compaction scenario.
+  */
+class DataCompactionBlockletBoundryTest extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists  blocklettest")
+    sql("drop table if exists  Carbon_automation_hive")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+        "55")
+    sql(
+      "CREATE TABLE IF NOT EXISTS blocklettest (country String, ID String, date Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+        ".format'"
+    )
+
+
+    val csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv"
+
+    // loading the rows greater than 256. so that the column cardinality crosses byte boundary.
+    val csvFilePath2 = s"$resourcesPath/compaction/compactioncard2.csv"
+
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE blocklettest OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE blocklettest  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // compaction will happen here.
+    sql("alter table blocklettest compact 'major'"
+    )
+
+    sql(
+      "create table Carbon_automation_hive (ID String, date " +
+      "Timestamp,country String, name String, phonetype String, serialname String, salary Int ) row format " +
+      "delimited fields terminated by ',' TBLPROPERTIES ('skip.header.line.count'='1') "
+    )
+
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/compaction/compaction1_forhive.csv" + "' INTO " +
+        "table Carbon_automation_hive ")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/compaction/compactioncard2_forhive.csv" + "' INTO " +
+        "table Carbon_automation_hive ")
+
+  }
+
+  test("select country,count(*) as a from blocklettest")({
+    sql("select country,count(*) as a from Carbon_automation_hive group by country").show()
+    sql("select *  from Carbon_automation_hive").show
+    checkAnswer(
+      sql("select country,count(*) as a from blocklettest group by country"),
+      sql("select country,count(*) as a from Carbon_automation_hive group by country")
+    )
+  }
+  )
+
+  override def afterAll {
+    sql("drop table if exists  blocklettest")
+    sql("drop table if exists  Carbon_automation_hive")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+        "" + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
new file mode 100644
index 0000000..5e49671
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * FT for data compaction Boundary condition verification.
+ */
+class DataCompactionBoundaryConditionsTest extends QueryTest with BeforeAndAfterAll {
+  val carbonTableIdentifier: CarbonTableIdentifier =
+    new CarbonTableIdentifier("default", "boundarytest".toLowerCase(), "1")
+
+  override def beforeAll {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2")
+    sql("drop table if exists  boundarytest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS boundarytest (country String, ID Int, date " +
+      "Timestamp, name " +
+      "String, " +
+      "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+      ".format'"
+    )
+
+  }
+
+  /**
+   * Compaction verificatoin in case of no loads.
+   */
+  test("check if compaction is completed correctly.") {
+
+    try {
+      sql("alter table boundarytest compact 'minor'")
+      sql("alter table boundarytest compact 'major'")
+    }
+    catch {
+      case e: Exception =>
+        assert(false)
+    }
+  }
+
+  /**
+   * Compaction verificatoin in case of one loads.
+   */
+  test("check if compaction is completed correctly for one load.") {
+
+    var csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv"
+
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE boundarytest " +
+        "OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("alter table boundarytest compact 'minor'")
+    sql("alter table boundarytest compact 'major'")
+
+  }
+
+
+  override def afterAll {
+    sql("drop table if exists  boundarytest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
new file mode 100644
index 0000000..84a672c
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+
+
+
+/**
+  * FT for data compaction scenario.
+  */
+class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    sql("drop table if exists  cardinalityTest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS cardinalityTest (country String, ID String, date Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+        ".format'"
+    )
+
+
+    val csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv"
+
+    // loading the rows greater than 256. so that the column cardinality crosses byte boundary.
+    val csvFilePath2 = s"$resourcesPath/compaction/compactioncard2.csv"
+
+    val csvFilePath3 = s"$resourcesPath/compaction/compaction3.csv"
+
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE cardinalityTest OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE cardinalityTest  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // compaction will happen here.
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath3 + "' INTO TABLE cardinalityTest  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // compaction will happen here.
+    sql("alter table cardinalityTest compact 'major'"
+    )
+
+  }
+
+  test("check if compaction is completed or not and  verify select query.") {
+    var status = true
+    var noOfRetries = 0
+    while (status && noOfRetries < 10) {
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+          AbsoluteTableIdentifier(
+            CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+            new CarbonTableIdentifier("default", "cardinalityTest", "1")
+          )
+      )
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+
+      if (!segments.contains("0.1")) {
+        // wait for 2 seconds for compaction to complete.
+        Thread.sleep(500)
+        noOfRetries += 1
+      }
+      else {
+        status = false
+      }
+    }
+    // now check the answers it should be same.
+    checkAnswer(
+      sql("select country,count(*) from cardinalityTest group by country"),
+      Seq(Row("america",1),
+        Row("canada",1),
+        Row("chile",1),
+        Row("china",2),
+        Row("england",1),
+        Row("burma",152),
+        Row("butan",101),
+        Row("mexico",1),
+        Row("newzealand",1),
+        Row("westindies",1),
+        Row("india",1),
+        Row("iran",1),
+        Row("iraq",1),
+        Row("ireland",1)
+      )
+    )
+  }
+
+  override def afterAll {
+    sql("drop table if exists  cardinalityTest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
new file mode 100644
index 0000000..4a6ce90
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.locks.{LockUsage, CarbonLockFactory, ICarbonLock}
+
+
+/**
+  * FT for data compaction Locking scenario.
+  */
+class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
+
+  val absoluteTableIdentifier: AbsoluteTableIdentifier = new
+      AbsoluteTableIdentifier(
+        CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+        new CarbonTableIdentifier(
+          CarbonCommonConstants.DATABASE_DEFAULT_NAME, "compactionlocktesttable", "1")
+      )
+  val carbonTablePath: CarbonTablePath = CarbonStorePath
+    .getCarbonTablePath(absoluteTableIdentifier.getStorePath,
+      absoluteTableIdentifier.getCarbonTableIdentifier
+    )
+  val dataPath: String = carbonTablePath.getMetadataDirectoryPath
+
+  val carbonLock: ICarbonLock =
+    CarbonLockFactory
+      .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.COMPACTION_LOCK)
+
+  override def beforeAll {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION, "true")
+    sql("drop table if exists  compactionlocktesttable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS compactionlocktesttable (country String, ID Int, date " +
+        "Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+        ".format'"
+    )
+
+    var csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv"
+
+    var csvFilePath2 = s"$resourcesPath/compaction/compaction2.csv"
+    var csvFilePath3 = s"$resourcesPath/compaction/compaction3.csv"
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE compactionlocktesttable " +
+      "OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE compactionlocktesttable  " +
+      "OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath3 + "' INTO TABLE compactionlocktesttable  " +
+      "OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // take the lock so that next compaction will be failed.
+    carbonLock.lockWithRetries()
+
+    // compaction should happen here.
+    try{
+      sql("alter table compactionlocktesttable compact 'major'")
+    }
+    catch {
+      case e : Exception =>
+        assert(true)
+    }
+  }
+
+  /**
+    * Compaction should fail as lock is being held purposefully
+    */
+  test("check if compaction is failed or not.") {
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+      absoluteTableIdentifier
+    )
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+
+    if (!segments.contains("0.1")) {
+      assert(true)
+    }
+    else {
+      assert(false)
+    }
+  }
+
+
+  override def afterAll {
+    sql("drop table if exists  compactionlocktesttable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    carbonLock.unlock()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
new file mode 100644
index 0000000..8a01791
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.datastore.TableSegmentUniqueIdentifier
+import org.apache.carbondata.core.carbon.datastore.block.SegmentTaskIndexWrapper
+import org.apache.carbondata.core.carbon.path.CarbonStorePath
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.hadoop.CacheClient
+
+/**
+  * FT for compaction scenario where major segment should not be included in minor.
+  */
+class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    sql("drop table if exists  ignoremajor")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS ignoremajor (country String, ID Int, date Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+        ".format'"
+    )
+
+
+    val csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv"
+    val csvFilePath2 = s"$resourcesPath/compaction/compaction2.csv"
+    val csvFilePath3 = s"$resourcesPath/compaction/compaction3.csv"
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // compaction will happen here.
+    sql("alter table ignoremajor compact 'major'"
+    )
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor  OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+      sql("alter table ignoremajor compact 'minor'"
+      )
+
+  }
+
+  /**
+    * Test whether major compaction is not included in minor compaction.
+    */
+  test("delete merged folder and check segments") {
+    // delete merged segments
+    sql("clean files for table ignoremajor")
+    sql("select * from ignoremajor").show()
+    val identifier = new AbsoluteTableIdentifier(
+          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+          new CarbonTableIdentifier(
+            CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
+        )
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+    // merged segment should not be there
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    assert(segments.contains("0.1"))
+    assert(segments.contains("2.1"))
+    assert(!segments.contains("2"))
+    assert(!segments.contains("3"))
+    val cacheClient = new CacheClient(CarbonProperties.getInstance.
+      getProperty(CarbonCommonConstants.STORE_LOCATION));
+    val segmentIdentifier = new TableSegmentUniqueIdentifier(identifier, "2")
+    val wrapper: SegmentTaskIndexWrapper = cacheClient.getSegmentAccessClient.
+      getIfPresent(segmentIdentifier)
+    assert(null == wrapper)
+
+  }
+
+  /**
+    * Delete should not work on compacted segment.
+    */
+  test("delete compacted segment and check status") {
+    try {
+      sql("delete segment 2 from table ignoremajor")
+      assert(false)
+    }
+    catch {
+      case _:Throwable => assert(true)
+    }
+    val carbontablePath = CarbonStorePath
+      .getCarbonTablePath(CarbonProperties.getInstance
+        .getProperty(CarbonCommonConstants.STORE_LOCATION),
+        new CarbonTableIdentifier(
+          CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
+      )
+      .getMetadataDirectoryPath
+    val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
+
+    // status should remain as compacted.
+    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
+
+  }
+
+  /**
+    * Delete should not work on compacted segment.
+    */
+  test("delete compacted segment by date and check status") {
+    sql(
+      "DELETE SEGMENTS FROM TABLE ignoremajor where STARTTIME before" +
+        " '2222-01-01 19:35:01'"
+    )
+    val carbontablePath = CarbonStorePath
+      .getCarbonTablePath(CarbonProperties.getInstance
+        .getProperty(CarbonCommonConstants.STORE_LOCATION),
+        new CarbonTableIdentifier(
+          CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
+      )
+      .getMetadataDirectoryPath
+    val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
+
+    // status should remain as compacted for segment 2.
+    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
+    // for segment 0.1 . should get deleted
+    assert(segs(2).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE))
+
+  }
+
+  override def afterAll {
+    sql("drop table if exists  ignoremajor")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
new file mode 100644
index 0000000..addbc45
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+  * FT for compaction scenario where major compaction will only compact the segments which are
+  * present at the time of triggering the compaction.
+  */
+class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists  stopmajor")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID decimal(7,4), date Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+        ".format'"
+    )
+
+    val csvFilePath1 = s"$resourcesPath/compaction/compaction1.csv"
+    val csvFilePath2 = s"$resourcesPath/compaction/compaction2.csv"
+    val csvFilePath3 = s"$resourcesPath/compaction/compaction3.csv"
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE stopmajor OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE stopmajor  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // compaction will happen here.
+    sql("alter table stopmajor compact 'major'"
+    )
+    Thread.sleep(2000)
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE stopmajor OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE stopmajor  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    if (checkCompactionCompletedOrNot("0.1")) {
+    }
+
+  }
+
+  /**
+    * Check if the compaction is completed or not.
+    *
+    * @param requiredSeg
+    * @return
+    */
+  def checkCompactionCompletedOrNot(requiredSeg: String): Boolean = {
+    var status = false
+    var noOfRetries = 0
+    while (!status && noOfRetries < 10) {
+
+      val identifier = new AbsoluteTableIdentifier(
+            CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+            new CarbonTableIdentifier(
+              CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
+          )
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+      segments.foreach(seg =>
+        System.out.println( "valid segment is =" + seg)
+      )
+
+      if (!segments.contains(requiredSeg)) {
+        // wait for 2 seconds for compaction to complete.
+        System.out.println("sleping for 2 seconds.")
+        Thread.sleep(2000)
+        noOfRetries += 1
+      }
+      else {
+        status = true
+      }
+    }
+    return status
+  }
+
+  /**
+    * Test whether major compaction is not included in minor compaction.
+    */
+  test("delete merged folder and check segments") {
+    // delete merged segments
+    sql("clean files for table stopmajor")
+
+    val identifier = new AbsoluteTableIdentifier(
+          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+          new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
+        )
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+    // merged segment should not be there
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+    assert(segments.contains("0.1"))
+    assert(!segments.contains("0.2"))
+    assert(!segments.contains("0"))
+    assert(!segments.contains("1"))
+    assert(segments.contains("2"))
+    assert(segments.contains("3"))
+
+  }
+
+  override def afterAll {
+    sql("drop table if exists  stopmajor")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/af2f204e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
new file mode 100644
index 0000000..1a4d3d8
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.spark.rdd.{DataLoadPartitionCoalescer, RDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
+import org.scalatest.BeforeAndAfterAll
+
+class TestDataLoadPartitionCoalescer extends QueryTest with BeforeAndAfterAll {
+  var nodeList: Array[String] = _
+
+  class DummyPartition(val index: Int,
+                       rawSplit: FileSplit) extends Partition {
+    val serializableHadoopSplit = new SerializableWritable(rawSplit)
+  }
+
+  class Dummy(sc: SparkContext, partitions: Array[Partition]) extends RDD[Row](sc, Nil) {
+    override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+      new Iterator[Row] {
+        var isFirst = true;
+        override def hasNext: Boolean = isFirst;
+
+        override def next(): Row = {
+          isFirst = false
+          new GenericRow(Array[Any]())
+        }
+      }
+    }
+
+    override protected def getPartitions: Array[Partition] = partitions
+
+    override protected def getPreferredLocations(split: Partition): Seq[String] = {
+      split.asInstanceOf[DummyPartition].serializableHadoopSplit.value.getLocations.toSeq
+    }
+
+  }
+
+  override def beforeAll: Unit = {
+    nodeList = Array("host1", "host2", "host3")
+
+  }
+
+  def createPartition(index: Int, file: String, hosts: Array[String]) : Partition = {
+    new DummyPartition(index, new FileSplit(new Path(file), 0, 1, hosts))
+  }
+
+  def repartition(parts: Array[Partition]): Array[Partition] = {
+    new DataLoadPartitionCoalescer(new Dummy(sqlContext.sparkContext, parts), nodeList).run
+  }
+
+  def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
+    DataLoadPartitionCoalescer.checkPartition(prevParts, parts)
+  }
+
+  test("test number of partitions is more than nodes's") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3")),
+      createPartition(2, "3.csv", Array("host1", "host2", "host3")),
+      createPartition(3, "4.csv", Array("host1", "host2", "host3")),
+      createPartition(4, "5.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("test number of partitions equals nodes's") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3")),
+      createPartition(2, "3.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("test number of partitions is less than nodes's") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 2)
+    checkPartition(prevParts, parts)
+  }
+
+  test("all partitions are locality") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 2)
+    checkPartition(prevParts, parts)
+  }
+
+  test("part of partitions are locality1") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+      createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("part of partitions are locality2") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+      createPartition(2, "3.csv", Array("host3", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("part of partitions are locality3") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host7")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+      createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("all partition are not locality") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array()),
+      createPartition(1, "2.csv", Array()),
+      createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  override def afterAll {
+  }
+}
+


Mime
View raw message