From commits-return-12163-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Wed Jul 18 07:16:57 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D75EE180636 for ; Wed, 18 Jul 2018 07:16:55 +0200 (CEST) Received: (qmail 11029 invoked by uid 500); 18 Jul 2018 05:16:54 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 11020 invoked by uid 99); 18 Jul 2018 05:16:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jul 2018 05:16:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5ABF1DFB19; Wed, 18 Jul 2018 05:16:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kunalkapoor@apache.org To: commits@carbondata.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: carbondata git commit: [CARBONDATA-2738]Block Preaggregate, Compaction, Dictionary Exclude/Include for child columns for Complex datatype Date: Wed, 18 Jul 2018 05:16:54 +0000 (UTC) Repository: carbondata Updated Branches: refs/heads/master a26be1b18 -> 0c9a60e01 [CARBONDATA-2738]Block Preaggregate, Compaction, Dictionary Exclude/Include for child columns for Complex datatype Block Preaggregate, Compaction, Dictionary Exclude/Include for child column s and Update Complex datatype columns for Complex datatype This closes #2501 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c9a60e0 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c9a60e0 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c9a60e0 Branch: refs/heads/master Commit: 0c9a60e017211a27033eb770c416cf3ac303d9d4 Parents: a26be1b Author: Indhumathi27 Authored: Thu Jul 12 21:11:11 2018 +0530 Committer: kunal642 Committed: Wed Jul 18 10:45:52 2018 +0530 ---------------------------------------------------------------------- .../complexType/TestComplexDataType.scala | 176 ++++++++++++++++++- ...ataWithMalformedCarbonCommandException.scala | 12 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 14 +- .../sql/events/MergeIndexEventListener.scala | 6 +- .../CarbonAlterTableCompactionCommand.scala | 6 + .../CarbonProjectForUpdateCommand.scala | 10 +- .../preaaggregate/PreAggregateUtil.scala | 10 +- .../strategy/StreamingTableStrategy.scala | 2 +- .../spark/sql/optimizer/CarbonIUDRule.scala | 9 +- 9 files changed, 224 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala index 45a9c7a..6470648 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala @@ -4,10 +4,11 @@ import java.sql.Timestamp import scala.collection.mutable -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties @@ -18,6 +19,16 @@ import org.apache.carbondata.core.util.CarbonProperties class TestComplexDataType extends QueryTest with BeforeAndAfterAll { + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS table1") + sql("DROP TABLE IF EXISTS test") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS table1") + sql("DROP TABLE IF EXISTS test") + } + test("test Projection PushDown for Struct - Integer type") { sql("DROP TABLE IF EXISTS table1") sql( @@ -712,5 +723,166 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select a.b,id,a.c,person.detail[0],d.e,d.f,person.detail[1],id from test"),Seq(Row(2,1,3,5,3,2,6,1))) checkAnswer(sql("select a.b,id,a.c,person.detail[0],d.e,d.f,person.detail[1],id,1,a.b from test"),Seq(Row(2,1,3,5,3,2,6,1,1,2))) } - + + test("test block Update for complex datatype") { + sql("DROP TABLE IF EXISTS test") + sql("create table test(id int,a struct,d array) stored by 'carbondata'") + sql("insert into test values(1,'2$3',4)") + val structException = intercept[UnsupportedOperationException]( + sql("update test set(a.b)=(4) where id=1").show(false)) + assertResult("Unsupported operation on Complex data type")(structException.getMessage) + val arrayException = intercept[UnsupportedOperationException]( + sql("update test set(a)=(4) where id=1").show(false)) + assertResult("Unsupported operation on Complex data type")(arrayException.getMessage) + } + + test("test block partition column") { + sql("DROP TABLE IF EXISTS test") + val arrayException = intercept[AnalysisException]( + sql(""" + | CREATE TABLE IF NOT EXISTS test + | ( + | id Int, + | vin string, + | logdate Timestamp, + | phonenumber Long, + | country array, + | salary Int + | ) + | PARTITIONED BY (area array) + | STORED BY 'carbondata' + """.stripMargin)) + assertResult("Cannot use array for partition column;")(arrayException.getMessage) + sql("DROP TABLE IF EXISTS test") + val structException = intercept[AnalysisException]( + sql(""" + | CREATE TABLE IF NOT EXISTS test + | ( + | id Int, + | vin string, + | logdate Timestamp, + | phonenumber Long, + | country array, + | salary Int + | ) + | PARTITIONED BY (area struct) + | STORED BY 'carbondata' + """.stripMargin) + ) + assertResult("Cannot use struct for partition column;")(structException.getMessage) + } + + test("test block preaggregate") { + sql("DROP TABLE IF EXISTS test") + sql("create table test(id int,a struct) stored by 'carbondata'") + sql("insert into test values (1,2)") + sql("insert into test values (1,2)") + sql("insert into test values (1,2)") + val structException = intercept[UnsupportedOperationException]( + sql("create datamap preagg_sum on table test using 'preaggregate' as select id,sum(a.b) from test group by id")) + assertResult("Preaggregate is unsupported for ComplexData type column: a.b")(structException.getMessage) + sql("DROP TABLE IF EXISTS test") + sql("create table test(id int,a array) stored by 'carbondata'") + sql("insert into test values (1,2)") + val arrayException = intercept[UnsupportedOperationException]( + sql("create datamap preagg_sum on table test using 'preaggregate' as select id,sum(a[0]) from test group by id")) + assertResult("Preaggregate is unsupported for ComplexData type column: a[0]")(arrayException.getMessage) + } + + test("test block dictionary exclude for child column") { + sql("DROP TABLE IF EXISTS table1") + sql( + "create table table1 (roll int,a struct,j:int>) stored " + + "by " + + "'carbondata' tblproperties('dictionary_exclude'='a')") + sql("insert into table1 values(1,'1$abc$2$efg$3:mno:4$5')") + checkAnswer(sql("select a.b from table1"), Seq(Row(1))) + sql("DROP TABLE IF EXISTS table1") + val structException = intercept[MalformedCarbonCommandException]( + sql( + "create table table1 (roll int,a struct,j:int>) stored " + + "by " + + "'carbondata' tblproperties('dictionary_exclude'='a.b')")) + assertResult( + "DICTIONARY_EXCLUDE column: a.b does not exist in table or unsupported for complex child " + + "column. Please check create table statement.")( + structException.getMessage) + sql("DROP TABLE IF EXISTS table1") + val arrayException = intercept[MalformedCarbonCommandException]( + sql( + "create table table1 (roll int,a array) stored " + + "by " + + "'carbondata' tblproperties('dictionary_exclude'='a[0]')")) + assertResult( + "DICTIONARY_EXCLUDE column: a[0] does not exist in table or unsupported for complex child " + + "column. Please check create table statement.")( + arrayException.getMessage) + } + + test("test block dictionary include for child column") { + sql("DROP TABLE IF EXISTS table1") + val structException = intercept[MalformedCarbonCommandException]( + sql( + "create table table1 (roll int,a struct,j:int>) stored " + + "by " + + "'carbondata' tblproperties('dictionary_include'='a.b')")) + assertResult( + "DICTIONARY_INCLUDE column: a.b does not exist in table or unsupported for complex child " + + "column. Please check create table statement.")( + structException.getMessage) + sql("DROP TABLE IF EXISTS table1") + val arrayException = intercept[MalformedCarbonCommandException]( + sql( + "create table table1 (roll int,a array) stored " + + "by " + + "'carbondata' tblproperties('dictionary_include'='a[0]')")) + assertResult( + "DICTIONARY_INCLUDE column: a[0] does not exist in table or unsupported for complex child " + + "column. Please check create table statement.")( + arrayException.getMessage) + } + + test("test block compaction") { + sql("DROP TABLE IF EXISTS table1") + sql( + "create table table1 (roll int,person Struct) stored " + + "by 'carbondata'") + sql( + "load data inpath '" + resourcesPath + + "/Struct.csv' into table table1 options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'='&')") + sql( + "load data inpath '" + resourcesPath + + "/Struct.csv' into table table1 options('delimiter'=','," + + "'quotechar'='\"','fileheader'='roll,person','complex_delimiter_level_1'='$'," + + "'complex_delimiter_level_2'='&')") + val exception = intercept[UnsupportedOperationException]( + sql("alter table table1 compact 'major'")) + assertResult( + "Compaction is unsupported for Table containing Complex Columns")( + exception.getMessage) + val exception1 = intercept[UnsupportedOperationException]( + sql("alter table table1 compact 'minor'")) + assertResult( + "Compaction is unsupported for Table containing Complex Columns")( + exception1.getMessage) + val exception2 = intercept[UnsupportedOperationException]( + sql("alter table table1 compact 'custom' where segment.id in (0,1)")) + assertResult( + "Compaction is unsupported for Table containing Complex Columns")( + exception2.getMessage) + } + + test("test complex datatype double for encoding") { + sql("DROP TABLE IF EXISTS table1") + sql( + "create table table1 (person struct) stored by 'carbondata'") + sql("insert into table1 values('1000000000')") + checkExistence(sql("select * from table1"),true,"1.0E9") + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala index 6759049..5490f06 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala @@ -83,16 +83,20 @@ class TestLoadDataWithMalformedCarbonCommandException extends QueryTest with Bef val e = intercept[MalformedCarbonCommandException] { buildTableWithNoExistDictExclude() } - assert(e.getMessage.equals("DICTIONARY_EXCLUDE column: ccc does not exist in table. " + - "Please check create table statement.")) + assert(e.getMessage + .equals( + "DICTIONARY_EXCLUDE column: ccc does not exist in table or unsupported for complex child " + + "column. Please check create table statement.")) } test("test load data with dictionary include columns which no exist in table.") { val e = intercept[MalformedCarbonCommandException] { buildTableWithNoExistDictInclude() } - assert(e.getMessage.equals("DICTIONARY_INCLUDE column: aaa does not exist in table. " + - "Please check create table statement.")) + assert(e.getMessage + .equals( + "DICTIONARY_INCLUDE column: aaa does not exist in table or unsupported for complex child " + + "column. Please check create table statement.")) } test("test load data with dictionary include is same with dictionary exclude") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 44adff3..97fe51a 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -722,16 +722,13 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { .foreach { dictExcludeCol => if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) { val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol + - " does not exist in table. Please check create table statement." + " does not exist in table or unsupported for complex child column. " + + "Please check create table statement." throw new MalformedCarbonCommandException(errormsg) } else { val dataType = fields.find(x => x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get - if (isComplexDimDictionaryExclude(dataType)) { - val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " + - dictExcludeCol - throw new MalformedCarbonCommandException(errormsg) - } else if (!isDataTypeSupportedForDictionary_Exclude(dataType)) { + if (!isDataTypeSupportedForDictionary_Exclude(dataType)) { val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() + " data type column: " + dictExcludeCol throw new MalformedCarbonCommandException(errorMsg) @@ -750,7 +747,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { dictIncludeCols.foreach { distIncludeCol => if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) { val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim + - " does not exist in table. Please check create table statement." + " does not exist in table or unsupported for complex child column. " + + "Please check create table statement." throw new MalformedCarbonCommandException(errormsg) } if (varcharCols.exists(x => x.equalsIgnoreCase(distIncludeCol.trim))) { @@ -874,7 +872,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { * detects whether datatype is part of dictionary_exclude */ def isDataTypeSupportedForDictionary_Exclude(columnDataType: String): Boolean = { - val dataTypes = Array("string", "timestamp", "int", "long", "bigint") + val dataTypes = Array("string", "timestamp", "int", "long", "bigint", "struct", "array") dataTypes.exists(x => x.equalsIgnoreCase(columnDataType)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index dd64423..dff3424 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -115,7 +115,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging { String]() loadFolderDetailsArray.foreach(loadMetadataDetails => { segmentFileNameMap - .put(loadMetadataDetails.getLoadName, loadMetadataDetails.getSegmentFile) + .put(loadMetadataDetails.getLoadName, + String.valueOf(loadMetadataDetails.getLoadStartTime)) }) CommonUtil.mergeIndexFiles(sparkSession.sparkContext, validSegmentIds, @@ -165,7 +166,8 @@ class MergeIndexEventListener extends OperationEventListener with Logging { .readLoadMetadata(carbonTable.getMetadataPath) val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, String]() loadFolderDetailsArray.foreach(loadMetadataDetails => { - segmentFileNameMap.put(loadMetadataDetails.getLoadName, loadMetadataDetails.getSegmentFile) + segmentFileNameMap + .put(loadMetadataDetails.getLoadName, String.valueOf(loadMetadataDetails.getLoadStartTime)) }) // filter out only the valid segments from the list of compacted segments // Example: say compacted segments list contains 0.1, 3.1, 6.1, 0.2. http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index a4e52c3..a4adbbb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -82,6 +82,12 @@ case class CarbonAlterTableCompactionCommand( throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } + if (table.getTableInfo.getFactTable.getListOfColumns.asScala + .exists(m => m.getDataType.isComplexType)) { + throw new UnsupportedOperationException( + "Compaction is unsupported for Table containing Complex Columns") + } + if (CarbonUtil.hasAggregationDataMap(table) || (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) { // If the compaction request is of 'streaming' type then we need to generate loadCommands http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 573ea9a..964407f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -42,7 +42,8 @@ import org.apache.carbondata.processing.loading.FailureCauses private[sql] case class CarbonProjectForUpdateCommand( plan: LogicalPlan, databaseNameOp: Option[String], - tableName: String) + tableName: String, + columns: List[String]) extends DataCommand { override def processData(sparkSession: SparkSession): Seq[Row] = { @@ -59,6 +60,13 @@ private[sql] case class CarbonProjectForUpdateCommand( return Seq.empty } val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + columns.foreach { col => + val dataType = carbonTable.getColumnByName(tableName, col).getColumnSchema.getDataType + if (dataType.isComplexType) { + throw new UnsupportedOperationException("Unsupported operation on Complex data type") + } + + } if (!carbonTable.getTableInfo.isTransactionalTable) { throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 06ebc43..ecadf41 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAl import org.apache.spark.sql.CarbonExpressions.MatchCastExpression import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation} -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSeq, Cast, Expression, ExprId, NamedExpression, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, _} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field} @@ -354,7 +354,13 @@ object PreAggregateUtil { !expression.isInstanceOf[AttributeReference]) { newColumnName } else { - expression.asInstanceOf[AttributeReference].name + if (expression.isInstanceOf[GetStructField] || expression.isInstanceOf[GetArrayItem]) { + throw new UnsupportedOperationException( + "Preaggregate is unsupported for ComplexData type column: " + + expression.simpleString.replaceAll("#[0-9]*", "")) + } else { + expression.asInstanceOf[AttributeReference].name + } } createField(columnName, dataType, http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala index f4240e4..0849634 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala @@ -34,7 +34,7 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp override def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { - case CarbonProjectForUpdateCommand(_, databaseNameOp, tableName) => + case CarbonProjectForUpdateCommand(_, databaseNameOp, tableName, columns) => rejectIfStreamingTable( TableIdentifier(tableName, databaseNameOp), "Data update") http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c9a60e0/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala index 7300fe9..ae5825d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonIUDRule.scala @@ -40,6 +40,13 @@ class CarbonIUDRule extends Rule[LogicalPlan] with PredicateHelper { val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList .splitAt(pList.size - cols.size) val diff = cols.diff(dest.map(_.name.toLowerCase)) + cols.foreach { col => + val complexExists = "\"name\":\"" + col + "\"" + if (dest.exists(m => m.dataType.json.contains(complexExists))) { + throw new UnsupportedOperationException( + "Unsupported operation on Complex data type") + } + } if (diff.nonEmpty) { sys.error(s"Unknown column(s) ${ diff.mkString(",") } in table ${ table.tableName }") } @@ -47,7 +54,7 @@ class CarbonIUDRule extends Rule[LogicalPlan] with PredicateHelper { Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child) } CarbonProjectForUpdateCommand( - newPlan, table.tableIdentifier.database, table.tableIdentifier.table) + newPlan, table.tableIdentifier.database, table.tableIdentifier.table, cols) } } }