Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1FA1E200D0B for ; Wed, 27 Sep 2017 08:34:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1DE981609BC; Wed, 27 Sep 2017 06:34:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1A8281609EC for ; Wed, 27 Sep 2017 08:34:06 +0200 (CEST) Received: (qmail 52472 invoked by uid 500); 27 Sep 2017 06:34:06 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 52386 invoked by uid 99); 27 Sep 2017 06:34:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Sep 2017 06:34:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5BF7FF5AFB; Wed, 27 Sep 2017 06:34:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qiangcai@apache.org To: commits@carbondata.apache.org Date: Wed, 27 Sep 2017 06:34:07 -0000 Message-Id: <2cf6030bbce24d6ab41bf376220b4a70@git.apache.org> In-Reply-To: <9efd5e4631674b4481eef8a8c47c3d1d@git.apache.org> References: <9efd5e4631674b4481eef8a8c47c3d1d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] carbondata git commit: [CARBONDATA-1151] Refactor all carbon command to separate file in spark2 integration archived-at: Wed, 27 Sep 2017 06:34:09 -0000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala deleted file mode 100644 index 7ed280e..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ /dev/null @@ -1,1315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command - -import java.text.SimpleDateFormat -import java.util - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import scala.language.implicitConversions - -import org.apache.commons.lang3.StringUtils -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation} -import org.apache.spark.util.{AlterTableUtil, CausedBy, FileUtils, PartitionUtils} -import org.codehaus.jackson.map.ObjectMapper - -import org.apache.carbondata.api.CarbonStore -import org.apache.carbondata.common.constants.LoggerAction -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.CacheProvider -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.dictionary.server.DictionaryServer -import org.apache.carbondata.core.exception.InvalidConfigurationException -import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.PartitionInfo -import org.apache.carbondata.core.metadata.schema.partition.PartitionType -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.metadata.schema.table.TableInfo -import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.format -import org.apache.carbondata.processing.constants.TableOptionConstant -import org.apache.carbondata.processing.etl.DataLoadingException -import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants -import org.apache.carbondata.processing.newflow.exception.NoRetryException -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.load.ValidateUtil -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel} -import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil} - -object Checker { - def validateTableExists( - dbName: Option[String], - tableName: String, - session: SparkSession): Unit = { - val identifier = TableIdentifier(tableName, dbName) - if (!CarbonEnv.getInstance(session).carbonMetastore.tableExists(identifier)(session)) { - val err = s"table $dbName.$tableName not found" - LogServiceFactory.getLogService(this.getClass.getName).error(err) - throw new IllegalArgumentException(err) - } - } -} - -/** - * Interface for command that modifies schema - */ -trait SchemaProcessCommand { - def processSchema(sparkSession: SparkSession): Seq[Row] -} - -/** - * Interface for command that need to process data in file system - */ -trait DataProcessCommand { - def processData(sparkSession: SparkSession): Seq[Row] -} - -/** - * Command for show table partitions Command - * - * @param tableIdentifier - */ -private[sql] case class ShowCarbonPartitionsCommand( - tableIdentifier: TableIdentifier) extends RunnableCommand with SchemaProcessCommand { - - override val output = CommonUtil.partitionInfoOutput - override def run(sparkSession: SparkSession): Seq[Row] = { - processSchema(sparkSession) - } - - override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(tableIdentifier)(sparkSession). - asInstanceOf[CarbonRelation] - val carbonTable = relation.tableMeta.carbonTable - val tableName = carbonTable.getFactTableName - val partitionInfo = carbonTable.getPartitionInfo( - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName) - if (partitionInfo == null) { - throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName") - } - val partitionType = partitionInfo.getPartitionType - val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName - val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName) - LOGGER.info("partition column name:" + columnName) - CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo) - } -} - -/** - * Command for the compaction in alter table command - * - * @param alterTableModel - */ -case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand - with DataProcessCommand { - - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - - def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - val tableName = alterTableModel.tableName.toLowerCase - val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) - val relation = - CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(databaseName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] - if (relation == null) { - sys.error(s"Table $databaseName.$tableName does not exist") - } - if (null == relation.tableMeta.carbonTable) { - LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName") - sys.error(s"alter table failed. table not found: $databaseName.$tableName") - } - - val carbonLoadModel = new CarbonLoadModel() - - val table = relation.tableMeta.carbonTable - carbonLoadModel.setTableName(table.getFactTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) - carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) - - var storeLocation = CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, - System.getProperty("java.io.tmpdir") - ) - storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() - try { - CarbonDataRDDFactory - .alterTableForCompaction(sparkSession.sqlContext, - alterTableModel, - carbonLoadModel, - storeLocation - ) - } catch { - case e: Exception => - if (null != e.getMessage) { - sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }") - } else { - sys.error("Exception in compaction. Please check logs for more info.") - } - } - Seq.empty - } -} - -/** - * Command for Alter Table Add & Split partition - * Add is a special case of Splitting the default partition (part0) - * @param splitPartitionModel - */ -case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitPartitionModel) - extends RunnableCommand with DataProcessCommand with SchemaProcessCommand { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val tableName = splitPartitionModel.tableName - val splitInfo = splitPartitionModel.splitInfo - val partitionId = splitPartitionModel.partitionId.toInt - var partitionInfo: PartitionInfo = null - var carbonMetaStore: CarbonMetaStore = null - var relation: CarbonRelation = null - var dbName: String = null - var storePath: String = null - var table: CarbonTable = null - var carbonTableIdentifier: CarbonTableIdentifier = null - val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]() - val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, - CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, - LockUsage.COMPACTION_LOCK, - LockUsage.DELETE_SEGMENT_LOCK, - LockUsage.DROP_TABLE_LOCK, - LockUsage.CLEAN_FILES_LOCK, - LockUsage.ALTER_PARTITION_LOCK) - - // TODO will add rollback function incase process data failure - def run(sparkSession: SparkSession): Seq[Row] = { - processSchema(sparkSession) - processData(sparkSession) - } - - override def processSchema(sparkSession: SparkSession): Seq[Row] = { - dbName = splitPartitionModel.databaseName - .getOrElse(sparkSession.catalog.currentDatabase) - carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore - relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] - carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier - storePath = relation.tableMeta.storePath - if (relation == null) { - sys.error(s"Table $dbName.$tableName does not exist") - } - carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath) - if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { - LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") - sys.error(s"Alter table failed. table not found: $dbName.$tableName") - } - table = relation.tableMeta.carbonTable - partitionInfo = table.getPartitionInfo(tableName) - val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList - // keep a copy of partitionIdList before update partitionInfo. - // will be used in partition data scan - oldPartitionIds.addAll(partitionIds.asJava) - - if (partitionInfo == null) { - sys.error(s"Table $tableName is not a partition table.") - } - if (partitionInfo.getPartitionType == PartitionType.HASH) { - sys.error(s"Hash partition table cannot be added or split!") - } - PartitionUtils.updatePartitionInfo(partitionInfo, partitionIds, partitionId, - splitInfo, timestampFormatter, dateFormatter) - - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) - val schemaFilePath = carbonTablePath.getSchemaFilePath - // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) - val schemaConverter = new ThriftWrapperSchemaConverterImpl() - val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, - dbName, tableName, storePath) - val tableSchema = wrapperTableInfo.getFactTable - tableSchema.setPartitionInfo(partitionInfo) - wrapperTableInfo.setFactTable(tableSchema) - wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis()) - val thriftTable = - schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, - dbName, tableName, storePath) - CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) - // update the schema modified time - carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath) - sparkSession.catalog.refreshTable(tableName) - Seq.empty - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - var locks = List.empty[ICarbonLock] - var success = false - try { - locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, - locksToBeAcquired)(sparkSession) - val carbonLoadModel = new CarbonLoadModel() - val dataLoadSchema = new CarbonDataLoadSchema(table) - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - carbonLoadModel.setTableName(carbonTableIdentifier.getTableName) - carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(storePath) - val loadStartTime = CarbonUpdateUtil.readCurrentTime - carbonLoadModel.setFactTimeStamp(loadStartTime) - CarbonDataRDDFactory.alterTableSplitPartition(sparkSession.sqlContext, - partitionId.toString, - carbonLoadModel, - oldPartitionIds.asScala.toList - ) - success = true - } catch { - case e: Exception => - success = false - sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }") - } finally { - AlterTableUtil.releaseLocks(locks) - CacheProvider.getInstance().dropAllCache() - LOGGER.info("Locks released after alter table add/split partition action.") - LOGGER.audit("Locks released after alter table add/split partition action.") - if (success) { - LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName") - } - } - Seq.empty - } -} - -case class AlterTableDropPartition(alterTableDropPartitionModel: AlterTableDropPartitionModel) - extends RunnableCommand with DataProcessCommand with SchemaProcessCommand { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val tableName = alterTableDropPartitionModel.tableName - var dbName: String = null - val partitionId = alterTableDropPartitionModel.partitionId - val dropWithData = alterTableDropPartitionModel.dropWithData - if (partitionId == 0 ) { - sys.error(s"Cannot drop default partition! Please use delete statement!") - } - var partitionInfo: PartitionInfo = null - var carbonMetaStore: CarbonMetaStore = null - var relation: CarbonRelation = null - var storePath: String = null - var table: CarbonTable = null - var carbonTableIdentifier: CarbonTableIdentifier = null - val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]() - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, - LockUsage.COMPACTION_LOCK, - LockUsage.DELETE_SEGMENT_LOCK, - LockUsage.DROP_TABLE_LOCK, - LockUsage.CLEAN_FILES_LOCK, - LockUsage.ALTER_PARTITION_LOCK) - - def run(sparkSession: SparkSession): Seq[Row] = { - processSchema(sparkSession) - processData(sparkSession) - Seq.empty - } - - override def processSchema(sparkSession: SparkSession): Seq[Row] = { - dbName = alterTableDropPartitionModel.databaseName - .getOrElse(sparkSession.catalog.currentDatabase) - carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore - relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) - .asInstanceOf[CarbonRelation] - carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier - storePath = relation.tableMeta.storePath - carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath) - if (relation == null) { - sys.error(s"Table $dbName.$tableName does not exist") - } - if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { - LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") - sys.error(s"Alter table failed. table not found: $dbName.$tableName") - } - table = relation.tableMeta.carbonTable - partitionInfo = table.getPartitionInfo(tableName) - if (partitionInfo == null) { - sys.error(s"Table $tableName is not a partition table.") - } - val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList - // keep a copy of partitionIdList before update partitionInfo. - // will be used in partition data scan - oldPartitionIds.addAll(partitionIds.asJava) - val partitionIndex = partitionIds.indexOf(Integer.valueOf(partitionId)) - partitionInfo.getPartitionType match { - case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!") - case PartitionType.RANGE => - val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo) - val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1) - rangeInfo.remove(rangeToRemove) - partitionInfo.setRangeInfo(rangeInfo) - case PartitionType.LIST => - val listInfo = new util.ArrayList(partitionInfo.getListInfo) - val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1) - listInfo.remove(listToRemove) - partitionInfo.setListInfo(listInfo) - case PartitionType.RANGE_INTERVAL => - sys.error(s"Dropping range interval partition isn't support yet!") - } - partitionInfo.dropPartition(partitionIndex) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) - val schemaFilePath = carbonTablePath.getSchemaFilePath - // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) - - val schemaConverter = new ThriftWrapperSchemaConverterImpl() - val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, - dbName, tableName, storePath) - val tableSchema = wrapperTableInfo.getFactTable - tableSchema.setPartitionInfo(partitionInfo) - wrapperTableInfo.setFactTable(tableSchema) - wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis()) - val thriftTable = - schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - .setTime_stamp(System.currentTimeMillis) - carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, - dbName, tableName, storePath) - CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) - // update the schema modified time - carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath) - // sparkSession.catalog.refreshTable(tableName) - Seq.empty - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - var locks = List.empty[ICarbonLock] - var success = false - try { - locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, - locksToBeAcquired)(sparkSession) - val carbonLoadModel = new CarbonLoadModel() - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - carbonLoadModel.setTableName(carbonTableIdentifier.getTableName) - carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(storePath) - val loadStartTime = CarbonUpdateUtil.readCurrentTime - carbonLoadModel.setFactTimeStamp(loadStartTime) - CarbonDataRDDFactory.alterTableDropPartition(sparkSession.sqlContext, - partitionId, - carbonLoadModel, - dropWithData, - oldPartitionIds.asScala.toList - ) - success = true - } catch { - case e: Exception => - sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ") - success = false - } finally { - CacheProvider.getInstance().dropAllCache() - AlterTableUtil.releaseLocks(locks) - LOGGER.info("Locks released after alter table drop partition action.") - LOGGER.audit("Locks released after alter table drop partition action.") - } - LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName") - Seq.empty - } -} - - case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand - with SchemaProcessCommand { - - def run(sparkSession: SparkSession): Seq[Row] = { - processSchema(sparkSession) - } - - def setV(ref: Any, name: String, value: Any): Unit = { - ref.getClass.getFields.find(_.getName == name).get - .set(ref, value.asInstanceOf[AnyRef]) - } - - override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") - - val tableInfo: TableInfo = TableNewProcessor(cm) - - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," + - s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } - } - - LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]") - } - Seq.empty - } -} - -case class DeleteLoadsById( - loadids: Seq[String], - databaseNameOp: Option[String], - tableName: String) extends RunnableCommand with DataProcessCommand { - - def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable - CarbonStore.deleteLoadById( - loadids, - getDB.getDatabaseName(databaseNameOp, sparkSession), - tableName, - carbonTable - ) - Seq.empty - } -} - -case class DeleteLoadsByLoadDate( - databaseNameOp: Option[String], - tableName: String, - dateField: String, - loadDate: String) extends RunnableCommand with DataProcessCommand { - - def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable - CarbonStore.deleteLoadByDate( - loadDate, - getDB.getDatabaseName(databaseNameOp, sparkSession), - tableName, - carbonTable - ) - Seq.empty - } -} - -object LoadTable { - - def updateTableMetadata(carbonLoadModel: CarbonLoadModel, - sqlContext: SQLContext, - model: DictionaryLoadModel, - noDictDimension: Array[CarbonDimension]): Unit = { - val sparkSession = sqlContext.sparkSession - val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation, - model.table) - - val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - // read TableInfo - val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - - // modify TableInfo - val columns = tableInfo.getFact_table.getTable_columns - for (i <- 0 until columns.size) { - if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) { - columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY) - } - } - val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - entry.setTime_stamp(System.currentTimeMillis()) - - // write TableInfo - metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier, - carbonTablePath.getCarbonTableIdentifier, - tableInfo, entry, carbonTablePath.getPath)(sparkSession) - - // update the schema modified time - metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation) - - // update CarbonDataLoadSchema - val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName), - model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta - .carbonTable - carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable)) - } - -} - -case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, - child: LogicalPlan, - overwrite: Boolean) - extends RunnableCommand with DataProcessCommand { - - def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - val df = Dataset.ofRows(sparkSession, child) - val header = relation.tableSchema.get.fields.map(_.name).mkString(",") - val load = LoadTable( - Some(relation.carbonRelation.databaseName), - relation.carbonRelation.tableName, - null, - Seq(), - scala.collection.immutable.Map("fileheader" -> header), - overwrite, - null, - Some(df)).run(sparkSession) - // updating relation metadata. This is in case of auto detect high cardinality - relation.carbonRelation.metaData = - CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable) - load - } -} - -case class LoadTable( - databaseNameOp: Option[String], - tableName: String, - factPathFromUser: String, - dimFilesPath: Seq[DataLoadTableFileMapping], - options: scala.collection.immutable.Map[String, String], - isOverwriteTable: Boolean, - var inputSqlString: String = null, - dataFrame: Option[DataFrame] = None, - updateModel: Option[UpdateTableModel] = None) extends RunnableCommand with DataProcessCommand { - - private def getFinalOptions(carbonProperty: CarbonProperties): - scala.collection.mutable.Map[String, String] = { - var optionsFinal = scala.collection.mutable.Map[String, String]() - optionsFinal.put("delimiter", options.getOrElse("delimiter", ",")) - optionsFinal.put("quotechar", options.getOrElse("quotechar", "\"")) - optionsFinal.put("fileheader", options.getOrElse("fileheader", "")) - optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\")) - optionsFinal.put("commentchar", options.getOrElse("commentchar", "#")) - optionsFinal.put("columndict", options.getOrElse("columndict", null)) - optionsFinal - .put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N")) - optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable", - carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))) - val badRecordActionValue = carbonProperty - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - badRecordActionValue))) - optionsFinal - .put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))) - optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", "")) - optionsFinal - .put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$")) - optionsFinal - .put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:")) - optionsFinal.put("dateformat", options.getOrElse("dateformat", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))) - - optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions", - carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))) - - optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null)) - - optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))) - - optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))) - - val useOnePass = options.getOrElse("single_pass", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match { - case "true" => - true - case "false" => - // when single_pass = false and if either alldictionarypath - // or columnDict is configured the do not allow load - if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) || - StringUtils.isNotEmpty(optionsFinal("columndict"))) { - throw new MalformedCarbonCommandException( - "Can not use all_dictionary_path or columndict without single_pass.") - } else { - false - } - case illegal => - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " + - "Please set it as 'true' or 'false'") - false - } - optionsFinal.put("single_pass", useOnePass.toString) - optionsFinal - } - - private def checkDefaultValue(value: String, default: String) = { - if (StringUtils.isEmpty(value)) { - default - } else { - value - } - } - - def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - if (dataFrame.isDefined && updateModel.isEmpty) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - return Seq.empty - } - } - - val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase) - - val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - if (relation == null) { - sys.error(s"Table $dbName.$tableName does not exist") - } - if (null == relation.tableMeta.carbonTable) { - LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") - LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName") - sys.error(s"Data loading failed. table not found: $dbName.$tableName") - } - - val carbonProperty: CarbonProperties = CarbonProperties.getInstance() - carbonProperty.addProperty("zookeeper.enable.lock", "false") - val optionsFinal = getFinalOptions(carbonProperty) - - val tableProperties = relation.tableMeta.carbonTable.getTableInfo - .getFactTable.getTableProperties - - optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) - - try { - val factPath = if (dataFrame.isDefined) { - "" - } else { - FileUtils.getPaths( - CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser)) - } - val carbonLoadModel = new CarbonLoadModel() - carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) - carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) - carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) - - val table = relation.tableMeta.carbonTable - carbonLoadModel.setTableName(table.getFactTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - - val partitionLocation = relation.tableMeta.storePath + "/partition/" + - relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" + - relation.tableMeta.carbonTableIdentifier.getTableName + "/" - val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean - val sort_scope = optionsFinal("sort_scope") - val single_pass = optionsFinal("single_pass") - val bad_records_logger_enable = optionsFinal("bad_records_logger_enable") - val bad_records_action = optionsFinal("bad_records_action") - val bad_record_path = optionsFinal("bad_record_path") - val global_sort_partitions = optionsFinal("global_sort_partitions") - val dateFormat = optionsFinal("dateformat") - val delimeter = optionsFinal("delimiter") - val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1") - val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2") - val all_dictionary_path = optionsFinal("all_dictionary_path") - val column_dict = optionsFinal("columndict") - ValidateUtil.validateDateFormat(dateFormat, table, tableName) - ValidateUtil.validateSortScope(table, sort_scope) - - if (bad_records_logger_enable.toBoolean || - LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) { - if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { - sys.error("Invalid bad records location.") - } - } - carbonLoadModel.setBadRecordsLocation(bad_record_path) - - ValidateUtil.validateGlobalSortPartitions(global_sort_partitions) - carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\")) - carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\"")) - carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#")) - - // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, - // we should use table schema to generate file header. - var fileHeader = optionsFinal("fileheader") - val headerOption = options.get("header") - if (headerOption.isDefined) { - // whether the csv file has file header - // the default value is true - val header = try { - headerOption.get.toBoolean - } catch { - case ex: IllegalArgumentException => - throw new MalformedCarbonCommandException( - "'header' option should be either 'true' or 'false'. " + ex.getMessage) - } - header match { - case true => - if (fileHeader.nonEmpty) { - throw new MalformedCarbonCommandException( - "When 'header' option is true, 'fileheader' option is not required.") - } - case false => - // generate file header - if (fileHeader.isEmpty) { - fileHeader = table.getCreateOrderColumn(table.getFactTableName) - .asScala.map(_.getColName).mkString(",") - } - } - } - - carbonLoadModel.setDateFormat(dateFormat) - carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty( - CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty( - CarbonCommonConstants.CARBON_DATE_FORMAT, - CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) - carbonLoadModel - .setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + - optionsFinal("serialization_null_format")) - carbonLoadModel - .setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable) - carbonLoadModel - .setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action) - carbonLoadModel - .setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + - optionsFinal("is_empty_data_bad_record")) - carbonLoadModel.setSortScope(sort_scope) - carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb")) - carbonLoadModel.setGlobalSortPartitions(global_sort_partitions) - carbonLoadModel.setUseOnePass(single_pass.toBoolean) - if (delimeter.equalsIgnoreCase(complex_delimeter_level1) || - complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) || - delimeter.equalsIgnoreCase(complex_delimeter_level2)) { - sys.error(s"Field Delimiter & Complex types delimiter are same") - } - else { - carbonLoadModel.setComplexDelimiterLevel1( - CarbonUtil.delimiterConverter(complex_delimeter_level1)) - carbonLoadModel.setComplexDelimiterLevel2( - CarbonUtil.delimiterConverter(complex_delimeter_level2)) - } - // set local dictionary path, and dictionary file extension - carbonLoadModel.setAllDictPath(all_dictionary_path) - - val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS - try { - // First system has to partition the data first and then call the load data - LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") - carbonLoadModel.setFactFilePath(factPath) - carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)) - carbonLoadModel.setCsvHeader(fileHeader) - carbonLoadModel.setColDictFilePath(column_dict) - carbonLoadModel.setDirectLoad(true) - carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) - val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns, - optionsFinal("maxcolumns")) - carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) - GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata - val storePath = relation.tableMeta.storePath - // add the start entry for the new load in the table status file - if (!updateModel.isDefined) { - CommonUtil. - readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable) - } - if (isOverwriteTable) { - LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") - } - if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) - } - if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass && - StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) { - LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load") - LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load") - carbonLoadModel.setUseOnePass(false) - } - // Create table and metadata folders if not exist - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(storePath, table.getCarbonTableIdentifier) - val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath - val fileType = FileFactory.getFileType(metadataDirectoryPath) - if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { - FileFactory.mkdirs(metadataDirectoryPath, fileType) - } - if (carbonLoadModel.getUseOnePass) { - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier - .getCarbonTableIdentifier - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier) - val dictFolderPath = carbonTablePath.getMetadataDirectoryPath - val dimensions = carbonTable.getDimensionByTableName( - carbonTable.getFactTableName).asScala.toArray - val colDictFilePath = carbonLoadModel.getColDictFilePath - if (!StringUtils.isEmpty(colDictFilePath)) { - carbonLoadModel.initPredefDictMap() - // generate predefined dictionary - GlobalDictionaryUtil - .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier, - dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath) - } - if (!StringUtils.isEmpty(all_dictionary_path)) { - carbonLoadModel.initPredefDictMap() - GlobalDictionaryUtil - .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext, - carbonLoadModel, - storePath, - carbonTableIdentifier, - dictFolderPath, - dimensions, - all_dictionary_path) - } - // dictionaryServerClient dictionary generator - val dictionaryServerPort = carbonProperty - .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT, - CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT) - val sparkDriverHost = sparkSession.sqlContext.sparkContext. - getConf.get("spark.driver.host") - carbonLoadModel.setDictionaryServerHost(sparkDriverHost) - // start dictionary server when use one pass load and dimension with DICTIONARY - // encoding is present. - val allDimensions = table.getAllDimensions.asScala.toList - val createDictionary = allDimensions.exists { - carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) - } - val server: Option[DictionaryServer] = if (createDictionary) { - val dictionaryServer = DictionaryServer - .getInstance(dictionaryServerPort.toInt, carbonTable) - carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort) - sparkSession.sparkContext.addSparkListener(new SparkListener() { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { - dictionaryServer.shutdown() - } - }) - Some(dictionaryServer) - } else { - None - } - CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, - carbonLoadModel, - relation.tableMeta.storePath, - columnar, - partitionStatus, - server, - isOverwriteTable, - dataFrame, - updateModel) - } else { - val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) { - val fields = dataFrame.get.schema.fields - import org.apache.spark.sql.functions.udf - // extracting only segment from tupleId - val getSegIdUDF = udf((tupleId: String) => - CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID)) - // getting all fields except tupleId field as it is not required in the value - var otherFields = fields.toSeq - .filter(field => !field.name - .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) - .map(field => new Column(field.name)) - - // extract tupleId field which will be used as a key - val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute - .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))). - as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) - // use dataFrameWithoutTupleId as dictionaryDataFrame - val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*) - otherFields = otherFields :+ segIdColumn - // use dataFrameWithTupleId as loadDataFrame - val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*) - (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId)) - } else { - (dataFrame, dataFrame) - } - - GlobalDictionaryUtil.generateGlobalDictionary( - sparkSession.sqlContext, - carbonLoadModel, - relation.tableMeta.storePath, - dictionaryDataFrame) - CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, - carbonLoadModel, - relation.tableMeta.storePath, - columnar, - partitionStatus, - None, - isOverwriteTable, - loadDataFrame, - updateModel) - } - } catch { - case CausedBy(ex: NoRetryException) => - LOGGER.error(ex, s"Dataload failure for $dbName.$tableName") - throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") - case ex: Exception => - LOGGER.error(ex) - LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") - throw ex - } finally { - // Once the data load is successful delete the unwanted partition files - try { - val fileType = FileFactory.getFileType(partitionLocation) - if (FileFactory.isFileExist(partitionLocation, fileType)) { - val file = FileFactory - .getCarbonFile(partitionLocation, fileType) - CarbonUtil.deleteFoldersAndFiles(file) - } - } catch { - case ex: Exception => - LOGGER.error(ex) - LOGGER.audit(s"Dataload failure for $dbName.$tableName. " + - "Problem deleting the partition folder") - throw ex - } - - } - } catch { - case dle: DataLoadingException => - LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage) - throw dle - case mce: MalformedCarbonCommandException => - LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage) - throw mce - } - Seq.empty - } -} - -case class CleanFiles( - databaseNameOp: Option[String], - tableName: String, forceTableClean: Boolean = false) - extends RunnableCommand with DataProcessCommand { - - def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - if (forceTableClean) { - CarbonStore.cleanFiles( - getDB.getDatabaseName(databaseNameOp, sparkSession), - tableName, - CarbonEnv.getInstance(sparkSession).storePath, - null, - forceTableClean) - } else { - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val relation = catalog - .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation] - val carbonTable = relation.tableMeta.carbonTable - CarbonStore.cleanFiles( - getDB.getDatabaseName(databaseNameOp, sparkSession), - tableName, - relation.asInstanceOf[CarbonRelation].tableMeta.storePath, - carbonTable, - forceTableClean) - } - Seq.empty - } -} - -case class ShowLoads( - databaseNameOp: Option[String], - tableName: String, - limit: Option[String], - override val output: Seq[Attribute]) extends RunnableCommand with DataProcessCommand { - - def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. - lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. - tableMeta.carbonTable - CarbonStore.showSegments( - getDB.getDatabaseName(databaseNameOp, sparkSession), - tableName, - limit, - carbonTable.getMetaDataFilepath - ) - } -} - -case class CarbonDropTableCommand(ifExistsSet: Boolean, - databaseNameOp: Option[String], - tableName: String) - extends RunnableCommand with SchemaProcessCommand with DataProcessCommand { - - def run(sparkSession: SparkSession): Seq[Row] = { - processSchema(sparkSession) - processData(sparkSession) - } - - override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession) - val identifier = TableIdentifier(tableName, Option(dbName)) - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "") - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) - val carbonEnv = CarbonEnv.getInstance(sparkSession) - val catalog = carbonEnv.carbonMetastore - val storePath = carbonEnv.storePath - val tableIdentifier = - AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, - dbName.toLowerCase, tableName.toLowerCase) - catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath) - val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() - try { - locksToBeAcquired foreach { - lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock) - } - LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") - - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession) - LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]") - } catch { - case ex: Exception => - LOGGER.error(ex, s"Dropping table $dbName.$tableName failed") - sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}") - } finally { - if (carbonLocks.nonEmpty) { - val unlocked = carbonLocks.forall(_.unlock()) - if (unlocked) { - logInfo("Table MetaData Unlocked Successfully") - } - } - } - Seq.empty - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - // delete the table folder - val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession) - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tableIdentifier = - AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName) - val metadataFilePath = - CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath - val fileType = FileFactory.getFileType(metadataFilePath) - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) - } - Seq.empty - } -} - -private[sql] case class DescribeCommandFormatted( - child: SparkPlan, - override val output: Seq[Attribute], - tblIdentifier: TableIdentifier) - extends RunnableCommand with SchemaProcessCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - processSchema(sparkSession) - } - - private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = { - var results: Seq[(String, String, String)] = - Seq(("", "", ""), ("##Column Group Information", "", "")) - val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter { - case (groupId, _) => groupId != -1 - }.toSeq.sortBy(_._1) - val groups = groupedDimensions.map(colGroups => { - colGroups._2.map(dim => dim.getColName).mkString(", ") - }) - var index = 1 - groups.foreach { x => - results = results :+ (s"Column Group $index", x, "") - index = index + 1 - } - results - } - - override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore - .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation] - val mapper = new ObjectMapper() - val colProps = StringBuilder.newBuilder - val dims = relation.metaData.dims.map(x => x.toLowerCase) - var results: Seq[(String, String, String)] = child.schema.fields.map { field => - val fieldName = field.name.toLowerCase - val comment = if (dims.contains(fieldName)) { - val dimension = relation.metaData.carbonTable.getDimensionByName( - relation.tableMeta.carbonTableIdentifier.getTableName, fieldName) - if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) { - colProps.append(fieldName).append(".") - .append(mapper.writeValueAsString(dimension.getColumnProperties)) - .append(",") - } - if (dimension.hasEncoding(Encoding.DICTIONARY) && - !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { - "DICTIONARY, KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match { - case false => ",NOINVERTEDINDEX" - case _ => "" - }) - } else { - "KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match { - case false => ",NOINVERTEDINDEX" - case _ => "" - }) - } - } else { - "MEASURE" - } - (field.name, field.dataType.simpleString, comment) - } - val colPropStr = if (colProps.toString().trim().length() > 0) { - // drops additional comma at end - colProps.toString().dropRight(1) - } else { - colProps.toString() - } - results ++= Seq(("", "", ""), ("##Detailed Table Information", "", "")) - results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier - .getDatabaseName, "") - ) - results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, "")) - results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) - val carbonTable = relation.tableMeta.carbonTable - results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", "")) - results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable - .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants - .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - results ++= Seq(("", "", ""), ("##Detailed Column property", "", "")) - if (colPropStr.length() > 0) { - results ++= Seq((colPropStr, "", "")) - } else { - results ++= Seq(("ADAPTIVE", "", "")) - } - results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns( - relation.tableMeta.carbonTableIdentifier.getTableName).asScala - .map(column => column).mkString(","), "")) - val dimension = carbonTable - .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName) - results ++= getColumnGroups(dimension.asScala.toList) - if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) { - results ++= - Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName) - .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), "")) - } - results.map { case (name, dataType, comment) => - Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s") - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala new file mode 100644 index 0000000..28c53a1 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{AlterTableModel, DataProcessCommand, RunnableCommand} +import org.apache.spark.sql.hive.CarbonRelation + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory + +/** + * Command for the compaction in alter table command + */ +case class AlterTableCompactionCommand( + alterTableModel: AlterTableModel) + extends RunnableCommand with DataProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val tableName = alterTableModel.tableName.toLowerCase + val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) + val relation = + CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(databaseName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + if (relation == null) { + sys.error(s"Table $databaseName.$tableName does not exist") + } + if (null == relation.tableMeta.carbonTable) { + LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName") + sys.error(s"alter table failed. table not found: $databaseName.$tableName") + } + + val carbonLoadModel = new CarbonLoadModel() + + val table = relation.tableMeta.carbonTable + carbonLoadModel.setTableName(table.getFactTableName) + val dataLoadSchema = new CarbonDataLoadSchema(table) + // Need to fill dimension relation + carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) + carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName) + carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName) + carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath) + + var storeLocation = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH, + System.getProperty("java.io.tmpdir") + ) + storeLocation = storeLocation + "/carbonstore/" + System.nanoTime() + try { + CarbonDataRDDFactory + .alterTableForCompaction(sparkSession.sqlContext, + alterTableModel, + carbonLoadModel, + storeLocation + ) + } catch { + case e: Exception => + if (null != e.getMessage) { + sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }") + } else { + sys.error("Exception in compaction. Please check logs for more info.") + } + } + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala new file mode 100644 index 0000000..2003bb1 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand} +import org.apache.spark.sql.hive.CarbonRelation + +import org.apache.carbondata.api.CarbonStore + +case class CarbonShowLoadsCommand( + databaseNameOp: Option[String], + tableName: String, + limit: Option[String], + override val output: Seq[Attribute]) + extends RunnableCommand with DataProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable + CarbonStore.showSegments( + GetDB.getDatabaseName(databaseNameOp, sparkSession), + tableName, + limit, + carbonTable.getMetaDataFilepath + ) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala new file mode 100644 index 0000000..9406335 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession} +import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand} +import org.apache.spark.sql.hive.CarbonRelation + +import org.apache.carbondata.api.CarbonStore + +case class CleanFilesCommand( + databaseNameOp: Option[String], + tableName: String, forceTableClean: Boolean = false) + extends RunnableCommand with DataProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + if (forceTableClean) { + CarbonStore.cleanFiles( + GetDB.getDatabaseName(databaseNameOp, sparkSession), + tableName, + CarbonEnv.getInstance(sparkSession).storePath, + null, + forceTableClean) + } else { + val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore + val relation = catalog + .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation] + val carbonTable = relation.tableMeta.carbonTable + CarbonStore.cleanFiles( + GetDB.getDatabaseName(databaseNameOp, sparkSession), + tableName, + relation.asInstanceOf[CarbonRelation].tableMeta.storePath, + carbonTable, + forceTableClean) + } + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala new file mode 100644 index 0000000..1ea4508 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession} +import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand} +import org.apache.spark.sql.hive.CarbonRelation + +import org.apache.carbondata.api.CarbonStore + +case class DeleteLoadByIdCommand( + loadIds: Seq[String], + databaseNameOp: Option[String], + tableName: String) extends RunnableCommand with DataProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable + CarbonStore.deleteLoadById( + loadIds, + GetDB.getDatabaseName(databaseNameOp, sparkSession), + tableName, + carbonTable + ) + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala new file mode 100644 index 0000000..3d06b18 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession} +import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand} +import org.apache.spark.sql.hive.CarbonRelation + +import org.apache.carbondata.api.CarbonStore + +case class DeleteLoadByLoadDateCommand( + databaseNameOp: Option[String], + tableName: String, + dateField: String, + loadDate: String) + extends RunnableCommand with DataProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]. + tableMeta.carbonTable + CarbonStore.deleteLoadByDate( + loadDate, + GetDB.getDatabaseName(databaseNameOp, sparkSession), + tableName, + carbonTable + ) + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala new file mode 100644 index 0000000..3f0e093 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{DataProcessCommand, RunnableCommand} + +import org.apache.carbondata.spark.util.CarbonSparkUtil + +case class LoadTableByInsertCommand( + relation: CarbonDatasourceHadoopRelation, + child: LogicalPlan, + overwrite: Boolean) + extends RunnableCommand with DataProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processData(sparkSession) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + val df = Dataset.ofRows(sparkSession, child) + val header = relation.tableSchema.get.fields.map(_.name).mkString(",") + val load = LoadTableCommand( + Some(relation.carbonRelation.databaseName), + relation.carbonRelation.tableName, + null, + Seq(), + scala.collection.immutable.Map("fileheader" -> header), + overwrite, + null, + Some(df)).run(sparkSession) + // updating relation metadata. This is in case of auto detect high cardinality + relation.carbonRelation.metaData = + CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable) + load + } +}