Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1BC02200C01 for ; Thu, 19 Jan 2017 16:20:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1A4DC160B54; Thu, 19 Jan 2017 15:20:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1983D160B42 for ; Thu, 19 Jan 2017 16:20:19 +0100 (CET) Received: (qmail 17214 invoked by uid 500); 19 Jan 2017 15:20:19 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 17205 invoked by uid 99); 19 Jan 2017 15:20:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 15:20:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CD2C4C036F for ; Thu, 19 Jan 2017 15:20:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id NKjJ-s_xsShm for ; Thu, 19 Jan 2017 15:20:16 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 0E45C5FB45 for ; Thu, 19 Jan 2017 15:20:09 +0000 (UTC) Received: (qmail 17117 invoked by uid 99); 19 Jan 2017 15:20:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 15:20:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27160DFA85; Thu, 19 Jan 2017 15:20:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.incubator.apache.org Date: Thu, 19 Jan 2017 15:20:09 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-carbondata git commit: Fixed insert with select query when functions are used in query archived-at: Thu, 19 Jan 2017 15:20:21 -0000 Repository: incubator-carbondata Updated Branches: refs/heads/master db2c5f917 -> 68a16d2c9 Fixed insert with select query when functions are used in query reverted example Fixed testcase Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/529c06d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/529c06d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/529c06d2 Branch: refs/heads/master Commit: 529c06d22d715249cf2e42db3420af698f0f917a Parents: db2c5f9 Author: ravipesala Authored: Thu Jan 19 06:30:24 2017 +0530 Committer: jackylk Committed: Thu Jan 19 23:19:26 2017 +0800 ---------------------------------------------------------------------- .../examples/CarbonSessionExample.scala | 19 +++++- .../src/test/resources/data_with_all_types.csv | 10 ++++ .../InsertIntoCarbonTableTestCase.scala | 62 ++++++++++++++++---- .../execution/CarbonLateDecodeStrategy.scala | 45 +++++++------- 4 files changed, 102 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala index 1d485cd..0d9c43f 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.examples import java.io.File +import org.apache.commons.io.FileUtils import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -31,10 +32,19 @@ object CarbonSessionExample { + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" val warehouse = s"$rootPath/examples/spark2/target/warehouse" - val metastoredb = s"$rootPath/examples/spark2/target" + val metastoredb = s"$rootPath/examples/spark2/target/metastore_db" + + // clean data folder + if (true) { + val clean = (path: String) => FileUtils.deleteDirectory(new File(path)) + clean(storeLocation) + clean(warehouse) + clean(metastoredb) + } CarbonProperties.getInstance() .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins") + .addProperty("carbon.storelocation", storeLocation) .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ @@ -42,9 +52,12 @@ object CarbonSessionExample { val spark = SparkSession .builder() .master("local") - .appName("CarbonSessionExample") + .appName("CarbonExample") + .enableHiveSupport() .config("spark.sql.warehouse.dir", warehouse) - .getOrCreateCarbonSession(storeLocation, metastoredb) + .config("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$metastoredb;create=true") + .getOrCreateCarbonSession() spark.sparkContext.setLogLevel("WARN") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/integration/spark-common-test/src/test/resources/data_with_all_types.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/data_with_all_types.csv b/integration/spark-common-test/src/test/resources/data_with_all_types.csv new file mode 100644 index 0000000..5efff00 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/data_with_all_types.csv @@ -0,0 +1,10 @@ +1,10,100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23 11:01:01,aaa,2.5 +5,17,140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27 11:01:02,bbb,2.5 +1,11,100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23 11:01:03,ccc,2.5 +1,10,150,43.4,spark,2015/7/24 12:01:04,254.12,2015/7/24 11:01:04,ddd,2.5 +1,10,100,47.4,spark,2015/7/23 12:01:05,876.14,2015/7/23 11:01:05,eeee,3.5 +3,14,160,43.4,hive,2015/7/26 12:01:06,3454.32,2015/7/26 11:01:06,ff,2.5 +2,10,100,43.4,impala,2015/7/23 12:01:07,456.98,2015/7/23 11:01:07,ggg,2.5 +1,10,100,43.4,spark,2015/5/23 12:01:08,32.53,2015/5/23 11:01:08,hhh,2.5 +4,16,130,42.4,impala,2015/7/23 12:01:09,67.23,2015/7/23 11:01:09,iii,2.5 +1,10,100,43.4,spark,2015/7/23 12:01:10,832.23,2015/7/23 11:01:10,jjj,2.5 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index 3cb3520..db85393 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -113,16 +113,58 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("select imei,deviceInformationId,MAC from TCarbonLocal") ) } -// test("insert->insert empty data -pass") { -// sql("drop table if exists TCarbon") -// sql("create table TCarbon (imei string,deviceInformationId int,MAC string) STORED BY 'org.apache.carbondata.format'") -// sql("insert into TCarbon select imei,deviceInformationId,MAC from THive where MAC='wrongdata'") -// val result = sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'").collect() -// checkAnswer( -// sql("select imei,deviceInformationId,MAC from THive where MAC='wrongdata'"), -// sql("select imei,deviceInformationId,MAC from TCarbon where MAC='wrongdata'") -// ) -// } + + test("insert->insert with functions") { + sql("DROP TABLE IF EXISTS carbon_table") + sql("DROP TABLE IF EXISTS carbon_table1") + // Create table + sql( + s""" + | CREATE TABLE carbon_table( + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + + sql( + s""" + | CREATE TABLE carbon_table1( + | shortField smallint, + | intField int, + | bigintField bigint, + | doubleField double, + | stringField string, + | timestampField timestamp, + | decimalField decimal(18,2), + | dateField date, + | charField string, + | floatField float + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '${resourcesPath + "/data_with_all_types.csv"}' + | INTO TABLE carbon_table + | options('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField') + """.stripMargin) + + sql("""insert into table carbon_table1 select shortField,intField,bigintField,doubleField,ASCII(stringField), + timestampField,decimalField,dateField,charField,floatField from carbon_table + """).show + } + test("insert into existing load-pass") { val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT) CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/529c06d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 6f913a8..11166bf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -223,7 +223,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { pushedFilters, metadata, needDecoder, - updateRequestedColumns) + updateRequestedColumns.asInstanceOf[Seq[Attribute]]) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -231,15 +231,16 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder) val scan = getDataSourceScan(relation, - updateRequestedColumns, + updateRequestedColumns.asInstanceOf[Seq[Attribute]], scanBuilder, candidatePredicates, pushedFilters, metadata, needDecoder, - updateRequestedColumns) + updateRequestedColumns.asInstanceOf[Seq[Attribute]]) execution.ProjectExec( - projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) + updateRequestedColumnsFunc(projects, table, needDecoder).asInstanceOf[Seq[NamedExpression]], + filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } } @@ -251,7 +252,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { pushedFilters: Seq[Filter], metadata: Map[String, String], needDecoder: ArrayBuffer[AttributeReference], - updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = { + updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = { val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) && needDecoder.isEmpty) { @@ -272,29 +273,31 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } } - def updateRequestedColumnsFunc(requestedColumns: Seq[AttributeReference], + def updateRequestedColumnsFunc(requestedColumns: Seq[Expression], relation: CarbonDatasourceHadoopRelation, - needDecoder: ArrayBuffer[AttributeReference]): Seq[AttributeReference] = { + needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = { val map = relation.carbonRelation.metaData.dictionaryMap - requestedColumns.map { attr => - if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) { - attr - } else { - val dict = map.get(attr.name) - if (dict.isDefined && dict.get) { - AttributeReference(attr.name, - IntegerType, - attr.nullable, - attr.metadata)(attr.exprId, attr.qualifier) - } else { + requestedColumns.map { + case attr: AttributeReference => + if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) { attr + } else { + val dict = map.get(attr.name) + if (dict.isDefined && dict.get) { + AttributeReference(attr.name, + IntegerType, + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier) + } else { + attr + } } - } + case others => others } } private def getPartitioning(carbonTable: CarbonTable, - output: Seq[AttributeReference]): Partitioning = { + output: Seq[Attribute]): Partitioning = { val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName) if (info != null) { val cols = info.getListOfColumns.asScala @@ -304,7 +307,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val bucketColumns = cols.flatMap { n => val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName)) attrRef match { - case Some(attr) => + case Some(attr: AttributeReference) => Some(AttributeReference(attr.name, CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType), attr.nullable,