Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC48F200C38 for ; Wed, 15 Mar 2017 14:28:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AB11C160B70; Wed, 15 Mar 2017 13:28:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AADAF160B78 for ; Wed, 15 Mar 2017 14:28:49 +0100 (CET) Received: (qmail 15605 invoked by uid 500); 15 Mar 2017 13:28:48 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 15570 invoked by uid 99); 15 Mar 2017 13:28:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2017 13:28:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4F248C1412 for ; Wed, 15 Mar 2017 13:28:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.569 X-Spam-Level: X-Spam-Status: No, score=-3.569 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_NEUTRAL=0.652] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id HRn66M5In7PK for ; Wed, 15 Mar 2017 13:28:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 0563060D05 for ; Wed, 15 Mar 2017 13:28:45 +0000 (UTC) Received: (qmail 14717 invoked by uid 99); 15 Mar 2017 13:28:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2017 13:28:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EBDB4DFE1E; Wed, 15 Mar 2017 13:28:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ravipesala@apache.org To: commits@carbondata.incubator.apache.org Date: Wed, 15 Mar 2017 13:28:44 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-carbondata git commit: [CARBONDATA-744] he property "spark.carbon.custom.distribution" should be change to carbon.custom.block.distribution and should be part of CarbonProperties archived-at: Wed, 15 Mar 2017 13:28:50 -0000 Repository: incubator-carbondata Updated Branches: refs/heads/master d124a55d6 -> 6e333f02a [CARBONDATA-744] he property "spark.carbon.custom.distribution" should be change to carbon.custom.block.distribution and should be part of CarbonProperties Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/fc3b6160 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/fc3b6160 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/fc3b6160 Branch: refs/heads/master Commit: fc3b61606606f430f19d77ce3adab2ccaaca1d07 Parents: d124a55 Author: mohammadshahidkhan Authored: Fri Mar 3 15:27:37 2017 +0530 Committer: ravipesala Committed: Wed Mar 15 18:58:02 2017 +0530 ---------------------------------------------------------------------- conf/carbon.properties.template | 2 + .../core/constants/CarbonCommonConstants.java | 2 + .../CarbonCustomBlockDistributionTest.scala | 116 +++++++++++++++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 18 +-- 4 files changed, 130 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/conf/carbon.properties.template ---------------------------------------------------------------------- diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template index 9b85c75..ac2d20e 100644 --- a/conf/carbon.properties.template +++ b/conf/carbon.properties.template @@ -91,6 +91,8 @@ carbon.enable.quick.filter=false #carbon.tempstore.location=/opt/Carbon/TempStoreLoc ##data loading records count logger #carbon.load.log.counter=500000 +##To dissable/enable carbon block distribution +#carbon.custom.block.distribution=false ######## Compaction Configuration ######## ##to specify number of segments to be preserved from compaction #carbon.numberof.preserve.segments=0 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 3eb91b0..a70102b 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1154,6 +1154,8 @@ public final class CarbonCommonConstants { public static final String USE_KETTLE = "use_kettle"; public static final String USE_KETTLE_DEFAULT = "false"; + public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution"; + public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false"; private CarbonCommonConstants() { } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala new file mode 100644 index 0000000..6a8c23b --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/blockprune/CarbonCustomBlockDistributionTest.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.blockprune + +import java.io.DataOutputStream + +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties + +/** + * This class contains test cases for block prune query for carbon custom block distribution + */ +class CarbonCustomBlockDistributionTest extends QueryTest with BeforeAndAfterAll { + val outputPath = s"$resourcesPath/block_prune_test.csv" + override def beforeAll { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, "true") + // Since the data needed for block prune is big, need to create a temp data file + val testData: Array[String]= new Array[String](3) + testData(0) = "a" + testData(1) = "b" + testData(2) = "c" + var writer: DataOutputStream = null + try { + val fileType = FileFactory.getFileType(outputPath) + val file = FileFactory.getCarbonFile(outputPath, fileType) + if (!file.exists()) { + file.createNewFile() + } + writer = FileFactory.getDataOutputStream(outputPath, fileType) + for (i <- 0 to 2) { + for (j <- 0 to 240000) { + writer.writeBytes(testData(i) + "," + j + "\n") + } + } + } catch { + case ex: Exception => + LOGGER.error(ex, "Build test file for block prune failed") + } finally { + if (writer != null) { + try { + writer.close() + } catch { + case ex: Exception => + LOGGER.error(ex, "Close output stream catching exception") + } + } + } + + sql("DROP TABLE IF EXISTS blockprune") + } + + test("test block prune query") { + sql( + """ + CREATE TABLE IF NOT EXISTS blockprune (name string, id int) + STORED BY 'org.apache.carbondata.format' + """) + sql( + s"LOAD DATA LOCAL INPATH '$outputPath' INTO table blockprune options('FILEHEADER'='name,id')" + ) + // data is in all 7 blocks + checkAnswer( + sql( + """ + select name,count(name) as amount from blockprune + where name='c' or name='b' or name='a' group by name + """), + Seq(Row("a", 240001), Row("b", 240001), Row("c", 240001))) + + // data only in middle 3/4/5 blocks + checkAnswer( + sql( + """ + select name,count(name) as amount from blockprune + where name='b' group by name + """), + Seq(Row("b", 240001))) + } + + override def afterAll { + // delete the temp data file + CarbonProperties.getInstance().addProperty("carbon.custom.distribution","false") + try { + val fileType = FileFactory.getFileType(outputPath) + val file = FileFactory.getCarbonFile(outputPath, fileType) + if (file.exists()) { + file.delete() + } + } catch { + case ex: Exception => + LOGGER.error(ex, "Delete temp test data file for block prune catching exception") + } + sql("DROP TABLE IF EXISTS blockprune") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/fc3b6160/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 09612cb..d24d29a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -31,13 +31,14 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hive.DistributionUtil import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} import org.apache.carbondata.hadoop._ import org.apache.carbondata.spark.load.CarbonLoaderUtil @@ -116,8 +117,9 @@ class CarbonScanRDD( i += 1 result.add(partition) } - } else if (sparkContext.getConf.contains("spark.carbon.custom.distribution") && - sparkContext.getConf.getBoolean("spark.carbon.custom.distribution", false)) { + } else if (CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) { // create a list of block based on split val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) @@ -207,10 +209,10 @@ class CarbonScanRDD( private var finished = false private var count = 0 - context.addTaskCompletionListener { context => - logStatistics(queryStartTime, count, model.getStatisticsRecorder) - reader.close() - } + context.addTaskCompletionListener { context => + logStatistics(queryStartTime, count, model.getStatisticsRecorder) + reader.close() + } override def hasNext: Boolean = { if (context.isInterrupted) { @@ -267,7 +269,7 @@ class CarbonScanRDD( } def logStatistics(queryStartTime: Long, recordCount: Int, - recorder: QueryStatisticsRecorder): Unit = { + recorder: QueryStatisticsRecorder): Unit = { var queryStatistic = new QueryStatistic() queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, System.currentTimeMillis - queryStartTime)