carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] incubator-carbondata git commit: Insert Select Into Same Table
Date Thu, 06 Apr 2017 04:49:19 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 950a6d0f5 -> ada081d89


Insert Select Into Same Table


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f527d3d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f527d3d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f527d3d2

Branch: refs/heads/master
Commit: f527d3d2e460076705834393182199c589ed318e
Parents: 950a6d0
Author: sounakr <sounakr@gmail.com>
Authored: Mon Apr 3 12:48:20 2017 +0530
Committer: sounakr <sounakr@gmail.com>
Committed: Wed Apr 5 23:52:20 2017 +0530

----------------------------------------------------------------------
 .../InsertIntoCarbonTableTestCase.scala         | 28 +++++++++
 .../spark/sql/hive/CarbonAnalysisRules.scala    | 43 +++++++-------
 .../spark/sql/CarbonCatalystOperators.scala     | 19 +++++++
 .../sql/CarbonDatasourceHadoopRelation.scala    | 13 +----
 .../sql/execution/command/DDLStrategy.scala     |  5 +-
 .../execution/command/carbonTableSchema.scala   |  3 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala    | 60 ++++++++++++++++++++
 .../spark/sql/hive/CarbonSessionState.scala     | 30 +++++++++-
 8 files changed, 163 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index e84e62a..0b491bf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -210,6 +210,32 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     )
   }
 
+  test("insert select from same table") {
+    val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+
+    sql("drop table if exists CarbonDest")
+    sql("create table CarbonDest (imei string,deviceInformationId int,MAC string,deviceColor
string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit
string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels
string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string,
deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string,
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string,
ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion
string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string,
Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion
string, Active_phonePADPartitionedVersio
 ns string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string,
Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district
string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion
string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string,
Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion
string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription
string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'")
+
+    sql("drop table if exists HiveDest")
+    sql("create table HiveDest (imei string,deviceInformationId int,MAC string,deviceColor
string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit
string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels
string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string,
deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string,
deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string,
ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion
string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string,
Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion
string, Active_phonePADPartitionedVersions
  string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string,
Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district
string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion
string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string,
Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion
string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription
string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY
','")
+
+    sql("insert into CarbonDest select * from THive")
+    sql("insert into CarbonDest select * from CarbonDest")
+
+    sql("insert into HiveDest select * from THive")
+    sql("insert into HiveDest select * from HiveDest")
+
+    checkAnswer(
+      sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operato
 rsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
from HiveDest order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Late
 st_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"),
+      sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operato
 rsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription
from CarbonDest order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La
 test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription")
+    )
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
timeStampPropOrig)
+  }
+
+
+
   override def afterAll {
     sql("drop table if exists load")
     sql("drop table if exists inser")
@@ -219,5 +245,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists TCarbonSource")
     sql("drop table if exists loadtable")
     sql("drop table if exists insertTable")
+    sql("drop table if exists CarbonDest")
+    sql("drop table if exists HiveDest")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index f22e958..d23b18f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -33,31 +33,32 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
  * Insert into carbon table from other source
  */
 object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
+    // Wait until children are resolved.
+    case p: LogicalPlan if !p.childrenResolved => p
 
-      case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
-           if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
-        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
-    }
+    case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+      if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
+      castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
+  }
 
-    def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
-      : LogicalPlan = {
-      if (relation.carbonRelation.output.size > CarbonCommonConstants
-        .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
-        sys
-          .error("Maximum supported column by carbon is:" + CarbonCommonConstants
-            .DEFAULT_MAX_NUMBER_OF_COLUMNS
-          )
-      }
-      if (child.output.size >= relation.carbonRelation.output.size ) {
-        InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
-      } else {
-         sys.error("Cannot insert into target table because column number are different")
-      }
+  def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
+  : LogicalPlan = {
+    if (relation.carbonRelation.output.size > CarbonCommonConstants
+      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+      sys
+        .error("Maximum supported column by carbon is:" + CarbonCommonConstants
+          .DEFAULT_MAX_NUMBER_OF_COLUMNS
+        )
+    }
+    if (child.output.size >= relation.carbonRelation.output.size ) {
+      InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
+    } else {
+      sys.error("Cannot insert into target table because column number are different")
     }
   }
+}
+
 
 object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index d94489b..4070088 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -75,3 +75,22 @@ case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier)
   override def output: Seq[AttributeReference] =
     Seq(AttributeReference("result", StringType, nullable = false)())
 }
+
+/**
+ * A logical plan representing insertion into Hive table
+ * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable
+ * because Hive Table doesn't have nullability for ARRAY, MAP,STRUCT types.
+ */
+case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation,
+    partition: Map[String, Option[String]],
+    child: LogicalPlan,
+    overwrite: OverwriteOptions,
+    ifNotExists: Boolean)
+  extends Command {
+
+    override def output: Seq[Attribute] = Seq.empty
+
+    // This is the expected schema of the table prepared to be inserted into
+    // including dynamic partition columns.
+    val tableOutput = table.carbonRelation.output
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 2743e7e..4169ac3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -43,7 +43,7 @@ case class CarbonDatasourceHadoopRelation(
     parameters: Map[String, String],
     tableSchema: Option[StructType],
     isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]())
-  extends BaseRelation with InsertableRelation {
+  extends BaseRelation {
 
   lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
   lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
@@ -74,15 +74,4 @@ case class CarbonDatasourceHadoopRelation(
   }
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
 
-  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    if (carbonRelation.output.size > CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
{
-      sys.error("Maximum supported column by carbon is:" +
-          CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS)
-    }
-    if(data.logicalPlan.output.size >= carbonRelation.output.size) {
-      LoadTableByInsert(this, data.logicalPlan).run(sparkSession)
-    } else {
-      sys.error("Cannot insert into target table because column number are different")
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 2916a9f..55148d2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{CarbonEnv, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable,
ShowLoadsCommand, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -54,6 +54,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
             identifier.table.toLowerCase)) :: Nil
       case ShowLoadsCommand(databaseName, table, limit) =>
         ExecutedCommandExec(ShowLoads(databaseName, table.toLowerCase, limit, plan.output))
:: Nil
+      case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
+        _, child: LogicalPlan, _, _) =>
+        ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
         CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
         ExecutedCommandExec(createDb) :: Nil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 4bd0564..77a0d90 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -280,7 +280,8 @@ object LoadTable {
 
 }
 
-case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
{
+case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
+  extends RunnableCommand {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   def run(sparkSession: SparkSession): Seq[Row] = {
     val df = Dataset.ofRows(sparkSession, child)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
new file mode 100644
index 0000000..45accac
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+
+/**
+ * Insert into carbon table from other source
+ */
+object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transform {
+      // Wait until children are resolved.
+      case p: LogicalPlan if !p.childrenResolved => p
+
+      case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+        if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation],
child)
+    }
+  }
+
+  def castChildOutput(p: InsertIntoTable,
+      relation: CarbonDatasourceHadoopRelation,
+      child: LogicalPlan)
+  : LogicalPlan = {
+    if (relation.carbonRelation.output.size > CarbonCommonConstants
+      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+      sys
+        .error("Maximum supported column by carbon is:" + CarbonCommonConstants
+          .DEFAULT_MAX_NUMBER_OF_COLUMNS
+        )
+    }
+    if (child.output.size >= relation.carbonRelation.output.size) {
+      InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
+    } else {
+      sys.error("Cannot insert into target table because column number are different")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index d81fc09..38c7f34 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -16,18 +16,22 @@
  */
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
-import org.apache.spark.sql.execution.{CarbonLateDecodeStrategy, SparkOptimizer}
+import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
 import org.apache.spark.sql.execution.command.DDLStrategy
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.SparkSession
 
 /**
  * Session state implementation to override sql parser and adding strategies
@@ -42,6 +46,26 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
   experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
 
   override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+  override lazy val analyzer: Analyzer = {
+    new Analyzer(catalog, conf) {
+      override val extendedResolutionRules =
+        catalog.ParquetConversions ::
+        catalog.OrcConversions ::
+        CarbonPreInsertionCasts ::
+        AnalyzeCreateTable(sparkSession) ::
+        PreprocessTableInsertion(conf) ::
+        DataSourceAnalysis(conf) ::
+        (if (conf.runSQLonFile) {
+          new ResolveDataSource(sparkSession) :: Nil
+        } else {
+          Nil
+        })
+
+      override val extendedCheckRules = Seq(
+        PreWriteCheck(conf, catalog))
+    }
+  }
 }
 
 class CarbonOptimizer(


Mime
View raw message