carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: [CARBONDATA-744] he property "spark.carbon.custom.distribution" should be change to carbon.custom.block.distribution and should be part of CarbonProperties
Date Wed, 15 Mar 2017 13:28:44 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master d124a55d6 -> 6e333f02a


[CARBONDATA-744] he property "spark.carbon.custom.distribution" should be change to carbon.custom.block.distribution
and should be part of CarbonProperties


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/fc3b6160
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/fc3b6160
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/fc3b6160

Branch: refs/heads/master
Commit: fc3b61606606f430f19d77ce3adab2ccaaca1d07
Parents: d124a55
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Fri Mar 3 15:27:37 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Wed Mar 15 18:58:02 2017 +0530

----------------------------------------------------------------------
 conf/carbon.properties.template                 |   2 +
 .../core/constants/CarbonCommonConstants.java   |   2 +
 .../CarbonCustomBlockDistributionTest.scala     | 116 +++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  18 +--
 4 files changed, 130 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/conf/carbon.properties.template
----------------------------------------------------------------------
diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template
index 9b85c75..ac2d20e 100644
--- a/conf/carbon.properties.template
+++ b/conf/carbon.properties.template
@@ -91,6 +91,8 @@ carbon.enable.quick.filter=false
 #carbon.tempstore.location=/opt/Carbon/TempStoreLoc
 ##data loading records count logger
 #carbon.load.log.counter=500000
+##To dissable/enable carbon block distribution
+#carbon.custom.block.distribution=false
 ######## Compaction Configuration ########
 ##to specify number of segments to be preserved from compaction
 #carbon.numberof.preserve.segments=0

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 3eb91b0..a70102b 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1154,6 +1154,8 @@ public final class CarbonCommonConstants {
   public static final String USE_KETTLE = "use_kettle";
 
   public static final String USE_KETTLE_DEFAULT = "false";
+  public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
+  public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";
 
   private CarbonCommonConstants() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala
new file mode 100644
index 0000000..6a8c23b
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.blockprune
+
+import java.io.DataOutputStream
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+  * This class contains test cases for block prune query for carbon custom block distribution
+  */
+class CarbonCustomBlockDistributionTest extends QueryTest with BeforeAndAfterAll {
+  val outputPath = s"$resourcesPath/block_prune_test.csv"
+  override def beforeAll {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, "true")
+    // Since the data needed for block prune is big, need to create a temp data file
+    val testData: Array[String]= new Array[String](3)
+    testData(0) = "a"
+    testData(1) = "b"
+    testData(2) = "c"
+    var writer: DataOutputStream = null
+    try {
+      val fileType = FileFactory.getFileType(outputPath)
+      val file = FileFactory.getCarbonFile(outputPath, fileType)
+      if (!file.exists()) {
+        file.createNewFile()
+      }
+      writer = FileFactory.getDataOutputStream(outputPath, fileType)
+      for (i <- 0 to 2) {
+        for (j <- 0 to 240000) {
+          writer.writeBytes(testData(i) + "," + j + "\n")
+        }
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, "Build test file for block prune failed")
+    } finally {
+      if (writer != null) {
+        try {
+          writer.close()
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex, "Close output stream catching exception")
+        }
+      }
+    }
+
+    sql("DROP TABLE IF EXISTS blockprune")
+  }
+
+  test("test block prune query") {
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS blockprune (name string, id int)
+        STORED BY 'org.apache.carbondata.format'
+      """)
+    sql(
+        s"LOAD DATA LOCAL INPATH '$outputPath' INTO table blockprune options('FILEHEADER'='name,id')"
+      )
+    // data is in all 7 blocks
+    checkAnswer(
+      sql(
+        """
+          select name,count(name) as amount from blockprune
+          where name='c' or name='b' or name='a' group by name
+        """),
+      Seq(Row("a", 240001), Row("b", 240001), Row("c", 240001)))
+
+    // data only in middle 3/4/5 blocks
+    checkAnswer(
+      sql(
+        """
+          select name,count(name) as amount from blockprune
+          where name='b' group by name
+        """),
+      Seq(Row("b", 240001)))
+  }
+
+  override def afterAll {
+    // delete the temp data file
+    CarbonProperties.getInstance().addProperty("carbon.custom.distribution","false")
+    try {
+      val fileType = FileFactory.getFileType(outputPath)
+      val file = FileFactory.getCarbonFile(outputPath, fileType)
+      if (file.exists()) {
+        file.delete()
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, "Delete temp test data file for block prune catching exception")
+    }
+    sql("DROP TABLE IF EXISTS blockprune")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 09612cb..d24d29a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -31,13 +31,14 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hive.DistributionUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
@@ -116,8 +117,9 @@ class CarbonScanRDD(
           i += 1
           result.add(partition)
         }
-      } else if (sparkContext.getConf.contains("spark.carbon.custom.distribution") &&
-                 sparkContext.getConf.getBoolean("spark.carbon.custom.distribution", false))
{
+      } else if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) {
         // create a list of block based on split
         val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
 
@@ -207,10 +209,10 @@ class CarbonScanRDD(
         private var finished = false
         private var count = 0
 
-      context.addTaskCompletionListener { context =>
-        logStatistics(queryStartTime, count, model.getStatisticsRecorder)
-        reader.close()
-      }
+        context.addTaskCompletionListener { context =>
+          logStatistics(queryStartTime, count, model.getStatisticsRecorder)
+          reader.close()
+        }
 
         override def hasNext: Boolean = {
           if (context.isInterrupted) {
@@ -267,7 +269,7 @@ class CarbonScanRDD(
   }
 
   def logStatistics(queryStartTime: Long, recordCount: Int,
-                    recorder: QueryStatisticsRecorder): Unit = {
+      recorder: QueryStatisticsRecorder): Unit = {
     var queryStatistic = new QueryStatistic()
     queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
       System.currentTimeMillis - queryStartTime)


Mime
View raw message