Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5CE3A200D30 for ; Mon, 30 Oct 2017 10:21:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5B7B31609EF; Mon, 30 Oct 2017 09:21:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 526DB160BF8 for ; Mon, 30 Oct 2017 10:21:53 +0100 (CET) Received: (qmail 87138 invoked by uid 500); 30 Oct 2017 09:21:52 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 87110 invoked by uid 99); 30 Oct 2017 09:21:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Oct 2017 09:21:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C605DFCE5; Mon, 30 Oct 2017 09:21:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Mon, 30 Oct 2017 09:21:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/35] carbondata git commit: [CARBONDATA-1481] Add test cases for compaction of global sorted segment archived-at: Mon, 30 Oct 2017 09:21:55 -0000 [CARBONDATA-1481] Add test cases for compaction of global sorted segment Only test cases are added for compaction of global sorted segment in this PR This closes #1361 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ac6c1d2b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ac6c1d2b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ac6c1d2b Branch: refs/heads/streaming_ingest Commit: ac6c1d2b8ca9e68b98d6845f710b70d60a5f48c5 Parents: 133b303 Author: xubo245 <601450868@qq.com> Authored: Wed Sep 13 20:15:39 2017 +0800 Committer: Jacky Li Committed: Wed Oct 11 22:55:00 2017 +0800 ---------------------------------------------------------------------- .../src/test/resources/globalsort/sample1.csv | 8 +- .../src/test/resources/globalsort/sample2.csv | 8 +- .../src/test/resources/globalsort/sample3.csv | 8 +- ...CompactionSupportGlobalSortBigFileTest.scala | 136 +++++ ...ompactionSupportGlobalSortFunctionTest.scala | 535 +++++++++++++++++++ ...mpactionSupportGlobalSortParameterTest.scala | 534 ++++++++++++++++++ .../testsuite/sortcolumns/TestSortColumns.scala | 1 + 7 files changed, 1218 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample1.csv b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv index 9cb11be..2fc7bc4 100644 --- a/integration/spark-common-test/src/test/resources/globalsort/sample1.csv +++ b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv @@ -1,5 +1,5 @@ id,name,city,age -1,a,wuhan,10 -2,b,hangzhou,20 -3,c,beijing,30 -4,d,shenzhen,40 +10,a,wuhan,10 +4,y,hangzhou,20 +7,z,beijing,30 +1,d,shenzhen,40 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample2.csv b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv index 300c254..75e7d93 100644 --- a/integration/spark-common-test/src/test/resources/globalsort/sample2.csv +++ b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv @@ -1,5 +1,5 @@ id,name,city,age -5,e,wuhan,50 -6,f,hangzhou,60 -7,g,beijing,70 -eight,h,shenzhen,80 +11,c,wuhan,50 +2,f,hangzhou,60 +5,m,beijing,70 +eight,b,shenzhen,80 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample3.csv b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv index 8e51dae..5eb6b02 100644 --- a/integration/spark-common-test/src/test/resources/globalsort/sample3.csv +++ b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv @@ -1,5 +1,5 @@ id,name,city,age -9,i,wuhan,90 -10,j,hangzhou,100 -11,k,beijing,110 -12,l,shenzhen,120 +9,e,wuhan,90 +6,x,hangzhou,100 +3,k,beijing,110 +12,l,shenzhen,120 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala new file mode 100644 index 0000000..6d79f6c --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datacompaction + +import java.io.{File, PrintWriter} + +import scala.util.Random + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + val file1 = resourcesPath + "/compaction/fil1.csv" + val file2 = resourcesPath + "/compaction/fil2.csv" + val file3 = resourcesPath + "/compaction/fil3.csv" + val file4 = resourcesPath + "/compaction/fil4.csv" + val file5 = resourcesPath + "/compaction/fil5.csv" + + override protected def beforeAll(): Unit = { + resetConf("10") + //n should be about 5000000 of reset if size is default 1024 + val n = 150000 + CompactionSupportGlobalSortBigFileTest.createFile(file1, n, 0) + CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) + CompactionSupportGlobalSortBigFileTest.createFile(file3, n * 3, n * 5) + CompactionSupportGlobalSortBigFileTest.createFile(file4, n * 2, n * 8) + CompactionSupportGlobalSortBigFileTest.createFile(file5, n * 2, n * 13) + } + + override protected def afterAll(): Unit = { + CompactionSupportGlobalSortBigFileTest.deleteFile(file1) + CompactionSupportGlobalSortBigFileTest.deleteFile(file2) + CompactionSupportGlobalSortBigFileTest.deleteFile(file3) + CompactionSupportGlobalSortBigFileTest.deleteFile(file4) + CompactionSupportGlobalSortBigFileTest.deleteFile(file5) + resetConf(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE) + } + + override def beforeEach { + sql("DROP TABLE IF EXISTS compaction_globalsort") + sql( + """ + | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql("DROP TABLE IF EXISTS carbon_localsort") + sql( + """ + | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + } + + override def afterEach { + sql("DROP TABLE IF EXISTS compaction_globalsort") + sql("DROP TABLE IF EXISTS carbon_localsort") + } + + test("Compaction major: segments size is bigger than default compaction size") { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE carbon_localsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE carbon_localsort OPTIONS('header'='false')") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE compaction_globalsort OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE compaction_globalsort OPTIONS('header'='false')") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + checkAnswer(sql("select count(*) from compaction_globalsort"),sql("select count(*) from carbon_localsort")) + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + } + + private def resetConf(size:String) { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE, size) + } +} + +object CompactionSupportGlobalSortBigFileTest { + def createFile(fileName: String, line: Int = 10000, start: Int = 0): Boolean = { + try { + val write = new PrintWriter(fileName); + for (i <- start until (start + line)) { + write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80)) + } + write.close() + } catch { + case _: Exception => false + } + true + } + + def deleteFile(fileName: String): Boolean = { + try { + val file = new File(fileName) + if (file.exists()) { + file.delete() + } + } catch { + case _: Exception => false + } + true + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala new file mode 100644 index 0000000..6f8648d --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the"License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an"AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datacompaction + +import java.io.{File, FilenameFilter} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + val filePath: String = s"$resourcesPath/globalsort" + val file1: String = resourcesPath + "/globalsort/sample1.csv" + val file2: String = resourcesPath + "/globalsort/sample2.csv" + val file3: String = resourcesPath + "/globalsort/sample3.csv" + + override def beforeEach { + resetConf + sql("DROP TABLE IF EXISTS compaction_globalsort") + sql( + """ + | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql("DROP TABLE IF EXISTS carbon_localsort") + sql( + """ + | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + } + + override def afterEach { + sql("DROP TABLE IF EXISTS compaction_globalsort") + sql("DROP TABLE IF EXISTS carbon_localsort") + } + + test("Compaction type: major") { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 4) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 3) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"), + sql("SELECT * FROM carbon_localsort order by city,name limit 3")) + } + + test("Compaction type: minor, < default segments in level 1, not compact") { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(!SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 3) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + } + + test("Compaction type: minor, >= default segments and < (default segments)*2 in level 1, compact once") { + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 4) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + } + + test("Compaction type: minor, >= default segments in level 1,compact twice in level 1") { + for (i <- 0 until 3) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + } + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.contains("4.1")) + assert(!SegmentSequenceIds.contains("0.2")) + assert(SegmentSequenceIds.length == 11) + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(36))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + } + + test("Compaction type: minor, >= compacted segments in level 2,compact once in level 2") { + for (i <- 0 until 4) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + } + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.contains("8.1")) + assert(SegmentSequenceIds.contains("0.2")) + assert(SegmentSequenceIds.length == 16) + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 15) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(48))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkAnswer(sql("SELECT * FROM compaction_globalsort limit 12"), + sql("SELECT * FROM carbon_localsort order by city,name limit 12")) + } + + test("Compaction: clean files, major") { + for (i <- 0 until 1) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'major'") + sql("clean files for table compaction_globalsort") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 1) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"), + sql("SELECT * FROM carbon_localsort order by city,name limit 3")) + } + + test("Compaction: clean files, minor") { + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'minor'") + sql("clean files for table compaction_globalsort") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 3) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + } + + test("Compaction: global_sort_partitions=1, major") { + for (i <- 0 until 1) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'major'") + sql("clean files for table compaction_globalsort") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 1) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"), + sql("SELECT * FROM carbon_localsort order by city,name limit 3")) + } + + test("Compaction: global_sort_partitions=2, major") { + for (i <- 0 until 1) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'major'") + sql("clean files for table compaction_globalsort") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 1) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 2) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + } + + test("Compaction: delete, major") { + for (i <- 0 until 1) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'major'") + sql("clean files for table compaction_globalsort") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 1) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 2) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + sql("delete from table compaction_globalsort where SEGMENT.ID in (0.1)") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Success") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete") + } + + test("Compaction: delete, minor") { + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)") + sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)") + sql("ALTER TABLE compaction_globalsort COMPACT 'minor'") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(!SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 6) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete") + } + + test("Compaction: load from file dictory, three csv file, major") { + for (i <- 0 until 6) { + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort") + } + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 3) + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72))) + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + } + + test("Compaction: load from file dictory, three csv file, minor") { + for (i <- 0 until 6) { + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort") + } + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 3) + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72))) + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + } + + test("Compaction: one file and no sort_columns") { + sql("DROP TABLE IF EXISTS compaction_globalsort2") + sql( + """ + | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort2") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + sql("clean files for table compaction_globalsort") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 1) + assert(status.filter(_.equals("Compacted")).length == 0) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort2"), Seq(Row(12))) + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM compaction_globalsort2")) + sql("DROP TABLE IF EXISTS compaction_globalsort2") + } + + test("Compaction: global_sort sort_columns is int data type") { + sql("DROP TABLE IF EXISTS compaction_globalsort2") + sql( + """ + | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort2") + + sql("ALTER TABLE compaction_globalsort2 COMPACT 'MAJOR'") + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort2") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 4) + assert(status.filter(_.equals("Compacted")).length == 3) + + assert(getIndexFileCount("compaction_globalsort2", "0.1") === 1) + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort2"), Seq(Row(12))) + sql("DROP TABLE IF EXISTS compaction_globalsort2") + } + + private def resetConf() { + val prop = CarbonProperties.getInstance() + prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + prop.addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT) + prop.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) + } + + private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { + val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo + val list = new File(store).list(new FilenameFilter { + override def accept(dir: File, name: String) = name.endsWith(".carbonindex") + }) + list.size + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala new file mode 100644 index 0000000..1511b51 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the"License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an"AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datacompaction + +import java.io.{File, FilenameFilter} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { + val filePath: String = s"$resourcesPath/globalsort" + val file1: String = resourcesPath + "/globalsort/sample1.csv" + val file2: String = resourcesPath + "/globalsort/sample2.csv" + val file3: String = resourcesPath + "/globalsort/sample3.csv" + + override def beforeEach { + resetConf + + sql("DROP TABLE IF EXISTS compaction_globalsort") + sql( + """ + | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') + """.stripMargin) + + sql("DROP TABLE IF EXISTS carbon_localsort") + sql( + """ + | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + } + + override def afterEach { + sql("DROP TABLE IF EXISTS compaction_globalsort") + sql("DROP TABLE IF EXISTS carbon_localsort") + } + + test("MINOR, ENABLE_AUTO_LOAD_MERGE: false") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)") + sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)") + sql("ALTER TABLE compaction_globalsort COMPACT 'minor'") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(!SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 6) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) + } + + test("MINOR, ENABLE_AUTO_LOAD_MERGE: true") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + + // loaded 6 times and produced 6 segments, + // auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor), + // so total segment number is 7 + assert(SegmentSequenceIds.length == 7) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) + } + + test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + "0") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 4) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER) + } + + test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + "4") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(!SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 6) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 0) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER) + } + + test("MINOR, DAYS_ALLOWED_TO_COMPACT: 0") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + "0") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 4) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT) + } + + test("MINOR, DAYS_ALLOWED_TO_COMPACT: 4") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + "4") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 4) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT) + } + + test("MAJOR, ENABLE_AUTO_LOAD_MERGE: false") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)") + sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)") + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 7) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) + } + + test("MAJOR, ENABLE_AUTO_LOAD_MERGE: true") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + + // loaded 6 times and produced 6 segments, + // auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor), + // major compact and prodece 1 segment + // so total segment number is 8 + assert(SegmentSequenceIds.length == 8) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) + } + + test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + "0") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 6) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER) + } + + test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + "4") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 2) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, + CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER) + } + + test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 0") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + "0") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 6) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT) + } + + test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 4") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + "4") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(!SegmentSequenceIds.contains("4.1")) + assert(SegmentSequenceIds.length == 7) + + val status = segments.collect().map { each => (each.toSeq) (1) } + assert(status.filter(_.equals("Compacted")).length == 6) + + assert(getIndexFileCount("compaction_globalsort", "0.1") === 1) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT) + } + private def resetConf() { + val prop = CarbonProperties.getInstance() + prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + prop.addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT) + prop.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD) + } + + private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { + val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo + val list = new File(store).list(new FilenameFilter { + override def accept(dir: File, name: String) = name.endsWith(".carbonindex") + }) + list.size + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala index 6347241..b655025 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala @@ -91,6 +91,7 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll { "create table with no dictionary sort_columns where NumberOfNoDictSortColumns is less than " + "NoDictionaryCount") { + sql("drop table if exists sorttable1b") sql( "CREATE TABLE sorttable1b (empno String, empname String, designation String, doj Timestamp," + " workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +