From commits-return-12132-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Wed Jul 18 04:19:42 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D65161807A6 for ; Wed, 18 Jul 2018 04:19:41 +0200 (CEST) Received: (qmail 7241 invoked by uid 500); 18 Jul 2018 02:19:35 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 7016 invoked by uid 99); 18 Jul 2018 02:19:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jul 2018 02:19:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 76E52E1169; Wed, 18 Jul 2018 02:19:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Wed, 18 Jul 2018 02:20:15 -0000 Message-Id: <48a0293e7ec8430e8f04a874bbfb2deb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [42/50] [abbrv] carbondata git commit: [CARBONDATA-2613] Support csv based carbon table http://git-wip-us.apache.org/repos/asf/carbondata/blob/2009009a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala new file mode 100644 index 0000000..e7f6c7f --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util.UUID + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.util.FileUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +/** + * support `alter table tableName add segment location 'path'` command. + * It will create a segment and map the path of datafile to segment's storage + */ +case class CarbonAddSegmentCommand( + dbNameOp: Option[String], + tableName: String, + filePathFromUser: String, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var carbonTable: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession) + carbonTable = { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + if (relation == null) { + LOGGER.error(s"Add segment failed due to table $dbName.$tableName not found") + throw new NoSuchTableException(dbName, tableName) + } + relation.carbonTable + } + + if (carbonTable.isHivePartitionTable) { + LOGGER.error("Ignore hive partition table for now") + } + + operationContext.setProperty("isOverwrite", false) + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { + val loadMetadataEvent = new LoadMetadataEvent(carbonTable, false) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) + } + Seq.empty + } + + // will just mapping external files to segment metadata + override def processData(sparkSession: SparkSession): Seq[Row] = { + // clean up invalid segment before creating a new entry + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, false, null) + val currentLoadMetadataDetails = SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(carbonTable.getTablePath)) + val newSegmentId = SegmentStatusManager.createNewSegmentId(currentLoadMetadataDetails).toString + // create new segment folder in carbon store + CarbonLoaderUtil.checkAndCreateCarbonDataLocation(newSegmentId, carbonTable) + + val factFilePath = FileUtils.getPaths(filePathFromUser) + + val uuid = if (carbonTable.isChildDataMap) { + Option(operationContext.getProperty("uuid")).getOrElse("").toString + } else if (carbonTable.hasAggregationDataMap) { + UUID.randomUUID().toString + } else { + "" + } + // associate segment meta with file path, files are separated with comma + val loadModel: CarbonLoadModel = new CarbonLoadModel + loadModel.setSegmentId(newSegmentId) + loadModel.setDatabaseName(carbonTable.getDatabaseName) + loadModel.setTableName(carbonTable.getTableName) + loadModel.setTablePath(carbonTable.getTablePath) + loadModel.setCarbonTransactionalTable(carbonTable.isTransactionalTable) + loadModel.readAndSetLoadMetadataDetails() + loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime()) + val loadSchema: CarbonDataLoadSchema = new CarbonDataLoadSchema(carbonTable) + loadModel.setCarbonDataLoadSchema(loadSchema) + + val newLoadMetadataDetail: LoadMetadataDetails = new LoadMetadataDetails + + // for external datasource table, there are no index files, so no need to write segment file + + // update table status file + newLoadMetadataDetail.setSegmentFile(null) + newLoadMetadataDetail.setSegmentStatus(SegmentStatus.SUCCESS) + newLoadMetadataDetail.setLoadStartTime(loadModel.getFactTimeStamp) + newLoadMetadataDetail.setLoadEndTime(CarbonUpdateUtil.readCurrentTime()) + newLoadMetadataDetail.setIndexSize("1") + newLoadMetadataDetail.setDataSize("1") + newLoadMetadataDetail.setFileFormat(FileFormat.EXTERNAL) + newLoadMetadataDetail.setFactFilePath(factFilePath) + + val done = CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetadataDetail, loadModel, true, + false, uuid) + if (!done) { + val errorMsg = + s""" + | Data load is failed due to table status update failure for + | ${loadModel.getDatabaseName}.${loadModel.getTableName} + """.stripMargin + throw new Exception(errorMsg) + } else { + DataMapStatusManager.disableAllLazyDataMaps(carbonTable) + } + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2009009a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 8eb47fc..d6691f6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { alterPartition | datamapManagement | alterTableFinishStreaming | stream protected lazy val loadManagement: Parser[LogicalPlan] = - deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew + deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew | addSegment protected lazy val restructure: Parser[LogicalPlan] = alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns @@ -443,6 +443,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { partition = partitionSpec) } + /** + * The syntax of + * ALTER TABLE [dbName.]tableName ADD SEGMENT LOCATION 'path/to/data' + */ + protected lazy val addSegment: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (ADD ~> SEGMENT ~> LOCATION ~> stringLit) <~ opt(";") ^^ { + case dbName ~ tableName ~ filePath => + CarbonAddSegmentCommand(convertDbNameToLowerCase(dbName), tableName, filePath) + } + protected lazy val deleteLoadsByID: Parser[LogicalPlan] = DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~