Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 963E0200CF0 for ; Mon, 17 Jul 2017 03:56:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 95094163B81; Mon, 17 Jul 2017 01:56:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 492B5163B7B for ; Mon, 17 Jul 2017 03:56:50 +0200 (CEST) Received: (qmail 94512 invoked by uid 500); 17 Jul 2017 01:56:49 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 94434 invoked by uid 99); 17 Jul 2017 01:56:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Jul 2017 01:56:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3AC2DF54F6; Mon, 17 Jul 2017 01:56:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ravipesala@apache.org To: commits@carbondata.apache.org Date: Mon, 17 Jul 2017 01:56:53 -0000 Message-Id: <816141fea20c4a40a63cb0be84ada724@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/15] carbondata git commit: [CARBONDATA-1284]Implement hive based schema storage in carbon archived-at: Mon, 17 Jul 2017 01:56:52 -0000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala new file mode 100644 index 0000000..048681c --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.UUID +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.impl.FileFactory.FileType +import org.apache.carbondata.core.fileoperations.FileWriteOperation +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.writer.ThriftWriter +import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.processing.merger.TableMeta +import org.apache.carbondata.spark.util.CarbonSparkUtil + +case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) { + // clear the metadata + def clear(): Unit = { + tablesMeta.clear() + } +} + +case class CarbonMetaData(dims: Seq[String], + msrs: Seq[String], + carbonTable: CarbonTable, + dictionaryMap: DictionaryMap) + +case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { + def get(name: String): Option[Boolean] = { + dictionaryMap.get(name.toLowerCase) + } +} + +class CarbonFileMetastore(conf: RuntimeConfig, val storePath: String) extends CarbonMetaStore { + + @transient + val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") + + val tableModifiedTimeStore = new java.util.HashMap[String, Long]() + tableModifiedTimeStore + .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis()) + + private val nextId = new AtomicLong(0) + + def nextQueryId: String = { + System.nanoTime() + "" + } + + lazy val metadata = loadMetadata(storePath, nextQueryId) + + + /** + * Create spark session from paramters. + * + * @param parameters + * @param absIdentifier + * @param sparkSession + */ + override def createCarbonRelation(parameters: Map[String, String], + absIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): CarbonRelation = { + lookupRelation(TableIdentifier(absIdentifier.getCarbonTableIdentifier.getTableName, + Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName)))(sparkSession) + .asInstanceOf[CarbonRelation] + } + + def lookupRelation(dbName: Option[String], tableName: String) + (sparkSession: SparkSession): LogicalPlan = { + lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) + } + + def lookupRelation(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): LogicalPlan = { + checkSchemasModifiedTimeAndReloadTables() + val database = tableIdentifier.database.getOrElse( + sparkSession.catalog.currentDatabase + ) + val tables = getTableFromMetadata(database, tableIdentifier.table, true) + tables match { + case Some(t) => + CarbonRelation(database, tableIdentifier.table, + CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head) + case None => + throw new NoSuchTableException(database, tableIdentifier.table) + } + } + + /** + * This method will search for a table in the catalog metadata + * + * @param database + * @param tableName + * @return + */ + def getTableFromMetadata(database: String, + tableName: String, readStore: Boolean = false): Option[TableMeta] = { + metadata.tablesMeta + .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && + c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) + } + + def tableExists( + table: String, + databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = { + tableExists(TableIdentifier(table, databaseOp))(sparkSession) + } + + def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { + checkSchemasModifiedTimeAndReloadTables() + val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val tables = metadata.tablesMeta.filter( + c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && + c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table)) + tables.nonEmpty + } + + def loadMetadata(metadataPath: String, queryId: String): MetaData = { + val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() + val statistic = new QueryStatistic() + // creating zookeeper instance once. + // if zookeeper is configured as carbon lock type. + val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) + if (null != zookeeperurl) { + CarbonProperties.getInstance + .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) + } + if (metadataPath == null) { + return null + } + // if no locktype is configured and store type is HDFS set HDFS lock as default + if (null == CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.LOCK_TYPE) && + FileType.HDFS == FileFactory.getFileType(metadataPath)) { + CarbonProperties.getInstance + .addProperty(CarbonCommonConstants.LOCK_TYPE, + CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS + ) + LOGGER.info("Default lock type HDFSLOCK is configured") + } + val fileType = FileFactory.getFileType(metadataPath) + val metaDataBuffer = new ArrayBuffer[TableMeta] + fillMetaData(metadataPath, fileType, metaDataBuffer) + updateSchemasUpdatedTime(readSchemaFileSystemTime("", "")) + statistic.addStatistics(QueryStatisticsConstants.LOAD_META, + System.currentTimeMillis()) + recorder.recordStatisticsForDriver(statistic, queryId) + MetaData(metaDataBuffer) + } + + private def fillMetaData(basePath: String, fileType: FileType, + metaDataBuffer: ArrayBuffer[TableMeta]): Unit = { + val databasePath = basePath // + "/schemas" + try { + if (FileFactory.isFileExist(databasePath, fileType)) { + val file = FileFactory.getCarbonFile(databasePath, fileType) + val databaseFolders = file.listFiles() + + databaseFolders.foreach(databaseFolder => { + if (databaseFolder.isDirectory) { + val dbName = databaseFolder.getName + val tableFolders = databaseFolder.listFiles() + + tableFolders.foreach(tableFolder => { + if (tableFolder.isDirectory) { + val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName, + tableFolder.getName, UUID.randomUUID().toString) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath, + carbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val tableName = tableFolder.getName + val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName + val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath) + val schemaFilePath = CarbonStorePath + .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath + wrapperTableInfo.setStorePath(storePath) + wrapperTableInfo + .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) + metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, + carbonTable) + } + } + }) + } + }) + } else { + // Create folders and files. + FileFactory.mkdirs(databasePath, fileType) + } + } catch { + case s: java.io.FileNotFoundException => + s.printStackTrace() + // Create folders and files. + FileFactory.mkdirs(databasePath, fileType) + } + } + + /** + * This method will overwrite the existing schema and update it with the given details + * + * @param newTableIdentifier + * @param thriftTableInfo + * @param schemaEvolutionEntry + * @param carbonStorePath + * @param sparkSession + */ + def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + schemaEvolutionEntry: SchemaEvolutionEntry, + carbonStorePath: String) + (sparkSession: SparkSession): String = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + if (schemaEvolutionEntry != null) { + thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) + } + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName, + carbonStorePath) + createSchemaThriftFile(wrapperTableInfo, + thriftTableInfo, + newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName)(sparkSession) + } + + /** + * This method will is used to remove the evolution entry in case of failure. + * + * @param carbonTableIdentifier + * @param thriftTableInfo + * @param carbonStorePath + * @param sparkSession + */ + def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + carbonStorePath: String) + (sparkSession: SparkSession): String = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + carbonTableIdentifier.getDatabaseName, + carbonTableIdentifier.getTableName, + carbonStorePath) + val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history + evolutionEntries.remove(evolutionEntries.size() - 1) + createSchemaThriftFile(wrapperTableInfo, + thriftTableInfo, + carbonTableIdentifier.getDatabaseName, + carbonTableIdentifier.getTableName)(sparkSession) + } + + + + /** + * + * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. + * Load CarbonTable from wrapper tableInfo + * + */ + def createTableFromThrift( + tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, + dbName: String, tableName: String)(sparkSession: SparkSession): (String, String) = { + if (tableExists(tableName, Some(dbName))(sparkSession)) { + sys.error(s"Table [$tableName] already exists under Database [$dbName]") + } + val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val thriftTableInfo = schemaConverter + .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) + thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history + .add(schemaEvolutionEntry) + val carbonTablePath = createSchemaThriftFile(tableInfo, + thriftTableInfo, + dbName, + tableName)(sparkSession) + LOGGER.info(s"Table $tableName for Database $dbName created successfully.") + (carbonTablePath, "") + } + + /** + * This method will write the schema thrift file in carbon store and load table metadata + * + * @param tableInfo + * @param thriftTableInfo + * @param dbName + * @param tableName + * @param sparkSession + * @return + */ + private def createSchemaThriftFile( + tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + dbName: String, tableName: String) + (sparkSession: SparkSession): String = { + val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, + tableInfo.getFactTable.getTableId) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val schemaFilePath = carbonTablePath.getSchemaFilePath + val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(storePath) + val fileType = FileFactory.getFileType(schemaMetadataPath) + if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { + FileFactory.mkdirs(schemaMetadataPath, fileType) + } + val thriftWriter = new ThriftWriter(schemaFilePath, false) + thriftWriter.open(FileWriteOperation.OVERWRITE) + thriftWriter.write(thriftTableInfo) + thriftWriter.close() + removeTableFromMetadata(dbName, tableName) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) + val tableMeta = new TableMeta(carbonTableIdentifier, storePath, + CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)) + metadata.tablesMeta += tableMeta + updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) + carbonTablePath.getPath + } + + /** + * This method will remove the table meta from catalog metadata array + * + * @param dbName + * @param tableName + */ + def removeTableFromMetadata(dbName: String, tableName: String): Unit = { + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName) + metadataToBeRemoved match { + case Some(tableMeta) => + metadata.tablesMeta -= tableMeta + CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) + case None => + LOGGER.debug(s"No entry for table $tableName in database $dbName") + } + } + + private def updateMetadataByWrapperTable( + wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = { + + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable( + wrapperTableInfo.getTableUniqueName) + for (i <- metadata.tablesMeta.indices) { + if (wrapperTableInfo.getTableUniqueName.equals( + metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) { + metadata.tablesMeta(i).carbonTable = carbonTable + } + } + } + + def updateMetadataByThriftTable(schemaFilePath: String, + tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = { + + tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) + .setTime_stamp(System.currentTimeMillis()) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) + wrapperTableInfo + .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) + wrapperTableInfo.setStorePath(storePath) + updateMetadataByWrapperTable(wrapperTableInfo) + } + + + def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { + val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val tableName = tableIdentifier.table.toLowerCase + + val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath, + new CarbonTableIdentifier(dbName, tableName, "")).getPath + + val fileType = FileFactory.getFileType(tablePath) + FileFactory.isFileExist(tablePath, fileType) + } + + def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + (sparkSession: SparkSession) { + val dbName = tableIdentifier.database.get + val tableName = tableIdentifier.table + + val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, + new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + if (null != carbonTable) { + // clear driver B-tree and dictionary cache + ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) + } + val fileType = FileFactory.getFileType(metadataFilePath) + + if (FileFactory.isFileExist(metadataFilePath, fileType)) { + // while drop we should refresh the schema modified time so that if any thing has changed + // in the other beeline need to update. + checkSchemasModifiedTimeAndReloadTables + val file = FileFactory.getCarbonFile(metadataFilePath, fileType) + CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) + val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, + tableIdentifier.table) + metadataToBeRemoved match { + case Some(tableMeta) => + metadata.tablesMeta -= tableMeta + CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) + updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) + case None => + LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName") + } + CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) + // discard cached table info in cachedDataSourceTables + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + } + } + + private def getTimestampFileAndType(databaseName: String, tableName: String) = { + val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE + val timestampFileType = FileFactory.getFileType(timestampFile) + (timestampFile, timestampFileType) + } + + /** + * This method will put the updated timestamp of schema file in the table modified time store map + * + * @param timeStamp + */ + private def updateSchemasUpdatedTime(timeStamp: Long) { + tableModifiedTimeStore.put("default", timeStamp) + } + + def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) { + updateSchemasUpdatedTime(touchSchemaFileSystemTime(databaseName, tableName)) + } + + /** + * This method will read the timestamp of empty schema file + * + * @param databaseName + * @param tableName + * @return + */ + private def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) + if (FileFactory.isFileExist(timestampFile, timestampFileType)) { + FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime + } else { + System.currentTimeMillis() + } + } + + /** + * This method will check and create an empty schema timestamp file + * + * @param databaseName + * @param tableName + * @return + */ + private def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = { + val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) + if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { + LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName") + FileFactory.createNewFile(timestampFile, timestampFileType) + } + val systemTime = System.currentTimeMillis() + FileFactory.getCarbonFile(timestampFile, timestampFileType) + .setLastModifiedTime(systemTime) + systemTime + } + + def checkSchemasModifiedTimeAndReloadTables() { + val (timestampFile, timestampFileType) = getTimestampFileAndType("", "") + if (FileFactory.isFileExist(timestampFile, timestampFileType)) { + if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). + getLastModifiedTime == + tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) { + refreshCache() + } + } + } + + private def refreshCache() { + metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta + } + + override def isReadFromHiveMetaStore: Boolean = false + + override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = + metadata.tablesMeta.map(_.carbonTable) + + override def getThriftTableInfo(tablePath: CarbonTablePath) + (sparkSession: SparkSession): TableInfo = { + val tableMetadataFile = tablePath.getSchemaFilePath + CarbonUtil.readSchemaFile(tableMetadataFile) + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala new file mode 100644 index 0000000..03d0bde --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, RuntimeConfig, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.LogicalRelation + +import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.format +import org.apache.carbondata.format.SchemaEvolutionEntry +import org.apache.carbondata.processing.merger.TableMeta +import org.apache.carbondata.spark.util.CarbonSparkUtil + +/** + * Metastore to store carbonschema in hive + */ +class CarbonHiveMetaStore(conf: RuntimeConfig, storePath: String) + extends CarbonFileMetastore(conf, storePath) { + + override def isReadFromHiveMetaStore: Boolean = true + + + /** + * Create spark session from paramters. + * + * @param parameters + * @param absIdentifier + * @param sparkSession + */ + override def createCarbonRelation(parameters: Map[String, String], + absIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): CarbonRelation = { + val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava) + if (info != null) { + val table = CarbonTable.buildFromTableInfo(info) + val meta = new TableMeta(table.getCarbonTableIdentifier, + table.getStorePath, table) + CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName, + CarbonSparkUtil.createSparkMeta(table), meta) + } else { + super.createCarbonRelation(parameters, absIdentifier, sparkSession) + } + } + + override def lookupRelation(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): LogicalPlan = { + val database = tableIdentifier.database.getOrElse( + sparkSession.catalog.currentDatabase) + val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), + _) => + carbonDatasourceHadoopRelation.carbonRelation + case LogicalRelation( + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + carbonDatasourceHadoopRelation.carbonRelation + case _ => throw new NoSuchTableException(database, tableIdentifier.table) + } + relation + } + + /** + * This method will search for a table in the catalog metadata + * + * @param database + * @param tableName + * @return + */ + override def getTableFromMetadata(database: String, + tableName: String, + readStore: Boolean): Option[TableMeta] = { + if (!readStore) { + None + } else { + super.getTableFromMetadata(database, tableName, readStore) + } + } + + override def tableExists(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): Boolean = { + try { + lookupRelation(tableIdentifier)(sparkSession) + } catch { + case e: Exception => + return false + } + true + } + + override def loadMetadata(metadataPath: String, + queryId: String): MetaData = { + MetaData(new ArrayBuffer[TableMeta]) + } + + + /** + * + * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. + * Load CarbonTable from wrapper tableInfo + * + */ + override def createTableFromThrift(tableInfo: TableInfo, dbName: String, + tableName: String)(sparkSession: SparkSession): (String, String) = { + val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, + tableInfo.getFactTable.getTableId) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + tableInfo.setMetaDataFilepath(schemaMetadataPath) + tableInfo.setStorePath(storePath) + val schemaEvolutionEntry = new schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime) + tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry) + removeTableFromMetadata(dbName, tableName) + CarbonMetadata.getInstance().loadTableMetadata(tableInfo) + (carbonTablePath.getPath, CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")) + } + + /** + * This method will remove the table meta from catalog metadata array + * + * @param dbName + * @param tableName + */ + override def removeTableFromMetadata(dbName: String, + tableName: String): Unit = { + // do nothing + } + + override def isTablePathExists(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): Boolean = { + tableExists(tableIdentifier)(sparkSession) + } + + override def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): Unit = { + val dbName = tableIdentifier.database.get + val tableName = tableIdentifier.table + + val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, + new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + if (null != carbonTable) { + // clear driver B-tree and dictionary cache + ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable) + } + val fileType = FileFactory.getFileType(metadataFilePath) + + if (FileFactory.isFileExist(metadataFilePath, fileType)) { + // while drop we should refresh the schema modified time so that if any thing has changed + // in the other beeline need to update. + val file = FileFactory.getCarbonFile(metadataFilePath, fileType) + CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) + } + CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) + // discard cached table info in cachedDataSourceTables + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + } + + override def checkSchemasModifiedTimeAndReloadTables(): Unit = { + // do nothing now + } + + override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = { + // Todo + Seq() + } + + override def getThriftTableInfo(tablePath: CarbonTablePath) + (sparkSession: SparkSession): format.TableInfo = { + val identifier = tablePath.getCarbonTableIdentifier + val relation = lookupRelation(TableIdentifier(identifier.getTableName, + Some(identifier.getDatabaseName)))(sparkSession).asInstanceOf[CarbonRelation] + val carbonTable = relation.metaData.carbonTable + val schemaConverter = new ThriftWrapperSchemaConverterImpl + schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo, + carbonTable.getDatabaseName, + carbonTable.getFactTableName) + } + + /** + * This method will overwrite the existing schema and update it with the given details + * + * @param newTableIdentifier + * @param thriftTableInfo + * @param schemaEvolutionEntry + * @param carbonStorePath + * @param sparkSession + */ + override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: format.TableInfo, + schemaEvolutionEntry: SchemaEvolutionEntry, + carbonStorePath: String) + (sparkSession: SparkSession): String = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + if (schemaEvolutionEntry != null) { + thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) + } + updateHiveMetaStore(newTableIdentifier, + oldTableIdentifier, + thriftTableInfo, + carbonStorePath, + sparkSession, + schemaConverter) + } + + private def updateHiveMetaStore(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: format.TableInfo, + carbonStorePath: String, + sparkSession: SparkSession, + schemaConverter: ThriftWrapperSchemaConverterImpl) = { + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName, + carbonStorePath) + wrapperTableInfo.setStorePath(storePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, newTableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath) + val dbName = oldTableIdentifier.getDatabaseName + val tableName = oldTableIdentifier.getTableName + val carbonUpdatedIdentifier = new CarbonTableIdentifier(dbName, tableName, + wrapperTableInfo.getFactTable.getTableId) + val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive( + s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) + removeTableFromMetadata(wrapperTableInfo.getDatabaseName, + wrapperTableInfo.getFactTable.getTableName) + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + CarbonStorePath.getCarbonTablePath(storePath, carbonUpdatedIdentifier).getPath + } + + /** + * This method will is used to remove the evolution entry in case of failure. + * + * @param carbonTableIdentifier + * @param thriftTableInfo + * @param carbonStorePath + * @param sparkSession + */ + override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: format.TableInfo, + carbonStorePath: String) + (sparkSession: SparkSession): String = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history + evolutionEntries.remove(evolutionEntries.size() - 1) + updateHiveMetaStore(carbonTableIdentifier, + carbonTableIdentifier, + thriftTableInfo, + carbonStorePath, + sparkSession, + schemaConverter) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala new file mode 100644 index 0000000..ab27b4f --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{RuntimeConfig, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.schema.table +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.processing.merger.TableMeta + +/** + * Interface for Carbonmetastore + */ +trait CarbonMetaStore { + + def lookupRelation(dbName: Option[String], tableName: String) + (sparkSession: SparkSession): LogicalPlan + + def lookupRelation(tableIdentifier: TableIdentifier) + (sparkSession: SparkSession): LogicalPlan + + /** + * Create spark session from paramters. + * @param parameters + * @param absIdentifier + * @param sparkSession + */ + def createCarbonRelation(parameters: Map[String, String], + absIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): CarbonRelation + + /** + * Get table meta + * TODO remove it if possible + * @param database + * @param tableName + * @param readStore + * @return + */ + def getTableFromMetadata(database: String, + tableName: String, + readStore: Boolean = false): Option[TableMeta] + + def tableExists( + table: String, + databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean + + def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean + + def loadMetadata(metadataPath: String, queryId: String): MetaData + + /** + * This method will overwrite the existing schema and update it with the given details + * + * @param newTableIdentifier + * @param thriftTableInfo + * @param schemaEvolutionEntry + * @param carbonStorePath + * @param sparkSession + */ + def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + schemaEvolutionEntry: SchemaEvolutionEntry, + carbonStorePath: String)(sparkSession: SparkSession): String + + /** + * This method will is used to remove the evolution entry in case of failure. + * + * @param carbonTableIdentifier + * @param thriftTableInfo + * @param carbonStorePath + * @param sparkSession + */ + def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + carbonStorePath: String) + (sparkSession: SparkSession): String + + /** + * + * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. + * Load CarbonTable from wrapper tableInfo + * + */ + def createTableFromThrift(tableInfo: table.TableInfo, + dbName: String, + tableName: String)(sparkSession: SparkSession): (String, String) + + /** + * This method will remove the table meta from catalog metadata array + * + * @param dbName + * @param tableName + */ + def removeTableFromMetadata(dbName: String, tableName: String): Unit + + def updateMetadataByThriftTable(schemaFilePath: String, + tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit + + def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean + + def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) + (sparkSession: SparkSession) + + def updateAndTouchSchemasUpdatedTime(databaseName: String, tableName: String) + + def checkSchemasModifiedTimeAndReloadTables() + + def isReadFromHiveMetaStore : Boolean + + def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] + + def storePath: String + + def getThriftTableInfo(tablePath: CarbonTablePath)(sparkSession: SparkSession): TableInfo + +} + +/** + * Factory for Carbon metastore + */ +object CarbonMetaStoreFactory { + + def createCarbonMetaStore(conf: RuntimeConfig, storePath: String): CarbonMetaStore = { + val readSchemaFromHiveMetaStore = readSchemaFromHive(conf) + if (readSchemaFromHiveMetaStore) { + new CarbonHiveMetaStore(conf, storePath) + } else { + new CarbonFileMetastore(conf, storePath) + } + } + + def readSchemaFromHive(conf: RuntimeConfig): Boolean = { + val readSchemaFromHive = { + if (conf.contains(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE)) { + conf.get(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE) + } else if (System.getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE) != null) { + System.getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE) + } else { + CarbonProperties.getInstance(). + getProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + readSchemaFromHive.toBoolean + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a9debfc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala deleted file mode 100644 index 04a94ce..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ /dev/null @@ -1,960 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.io._ -import java.util.{GregorianCalendar, LinkedHashSet, UUID} -import java.util.concurrent.atomic.AtomicLong - -import scala.Array.canBuildFrom -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.util.parsing.combinator.RegexParsers - -import org.apache.spark.sql.{RuntimeConfig, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException} -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.execution.command.Partitioner -import org.apache.spark.sql.types._ - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.CarbonFile -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.impl.FileFactory.FileType -import org.apache.carbondata.core.fileoperations.FileWriteOperation -import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier} -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} -import org.apache.carbondata.core.reader.ThriftReader -import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} -import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} -import org.apache.carbondata.core.writer.ThriftWriter -import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.processing.merger.TableMeta -import org.apache.carbondata.spark.util.CarbonSparkUtil - -case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) { - // clear the metadata - def clear(): Unit = { - tablesMeta.clear() - } -} - -case class CarbonMetaData(dims: Seq[String], - msrs: Seq[String], - carbonTable: CarbonTable, - dictionaryMap: DictionaryMap) - -object CarbonMetastore { - - def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = { - val createTBase = new ThriftReader.TBaseCreator() { - override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = { - new TableInfo() - } - } - val thriftReader = new ThriftReader(schemaFilePath, createTBase) - var tableInfo: TableInfo = null - try { - thriftReader.open() - tableInfo = thriftReader.read().asInstanceOf[TableInfo] - } finally { - thriftReader.close() - } - tableInfo - } - - def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = { - val thriftWriter = new ThriftWriter(schemaFilePath, false) - try { - thriftWriter.open() - thriftWriter.write(tableInfo); - } finally { - thriftWriter.close() - } - } - -} - -case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { - def get(name: String): Option[Boolean] = { - dictionaryMap.get(name.toLowerCase) - } -} - -class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { - - @transient - val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") - - val tableModifiedTimeStore = new java.util.HashMap[String, Long]() - tableModifiedTimeStore - .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis()) - - private val nextId = new AtomicLong(0) - - def nextQueryId: String = { - System.nanoTime() + "" - } - - val metadata = loadMetadata(storePath, nextQueryId) - - def getTableCreationTime(databaseName: String, tableName: String): Long = { - val tableMeta = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) - val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime - tableCreationTime - } - - def cleanStore(): Unit = { - try { - val fileType = FileFactory.getFileType(storePath) - FileFactory.deleteFile(storePath, fileType) - metadata.clear() - } catch { - case e: Throwable => LOGGER.error(e, "clean store failed") - } - } - - def lookupRelation(dbName: Option[String], tableName: String) - (sparkSession: SparkSession): LogicalPlan = { - lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) - } - - def lookupRelation(tableIdentifier: TableIdentifier, alias: Option[String] = None) - (sparkSession: SparkSession): LogicalPlan = { - checkSchemasModifiedTimeAndReloadTables() - val database = tableIdentifier.database.getOrElse( - sparkSession.catalog.currentDatabase - ) - val tables = getTableFromMetadata(database, tableIdentifier.table) - tables match { - case Some(t) => - CarbonRelation(database, tableIdentifier.table, - CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias) - case None => - throw new NoSuchTableException(database, tableIdentifier.table) - } - } - - /** - * This method will search for a table in the catalog metadata - * - * @param database - * @param tableName - * @return - */ - def getTableFromMetadata(database: String, - tableName: String): Option[TableMeta] = { - metadata.tablesMeta - .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) - } - - def tableExists( - table: String, - databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = { - checkSchemasModifiedTimeAndReloadTables() - val database = databaseOp.getOrElse(sparkSession.catalog.currentDatabase) - val tables = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(table)) - tables.nonEmpty - } - - def tableExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - checkSchemasModifiedTimeAndReloadTables() - val database = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tables = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table)) - tables.nonEmpty - } - - def loadMetadata(metadataPath: String, queryId: String): MetaData = { - val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val statistic = new QueryStatistic() - // creating zookeeper instance once. - // if zookeeper is configured as carbon lock type. - val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null) - if (null != zookeeperurl) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) - } - if (metadataPath == null) { - return null - } - // if no locktype is configured and store type is HDFS set HDFS lock as default - if (null == CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.LOCK_TYPE) && - FileType.HDFS == FileFactory.getFileType(metadataPath)) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.LOCK_TYPE, - CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS - ) - LOGGER.info("Default lock type HDFSLOCK is configured") - } - val fileType = FileFactory.getFileType(metadataPath) - val metaDataBuffer = new ArrayBuffer[TableMeta] - fillMetaData(metadataPath, fileType, metaDataBuffer) - updateSchemasUpdatedTime(readSchemaFileSystemTime("", "")) - statistic.addStatistics(QueryStatisticsConstants.LOAD_META, - System.currentTimeMillis()) - recorder.recordStatisticsForDriver(statistic, queryId) - MetaData(metaDataBuffer) - } - - private def fillMetaData(basePath: String, fileType: FileType, - metaDataBuffer: ArrayBuffer[TableMeta]): Unit = { - val databasePath = basePath // + "/schemas" - try { - if (FileFactory.isFileExist(databasePath, fileType)) { - val file = FileFactory.getCarbonFile(databasePath, fileType) - val databaseFolders = file.listFiles() - - databaseFolders.foreach(databaseFolder => { - if (databaseFolder.isDirectory) { - val dbName = databaseFolder.getName - val tableFolders = databaseFolder.listFiles() - - tableFolders.foreach(tableFolder => { - if (tableFolder.isDirectory) { - val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName, - tableFolder.getName, UUID.randomUUID().toString) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath, - carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableName = tableFolder.getName - val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName - val tableInfo: TableInfo = readSchemaFile(tableMetadataFile) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath) - val schemaFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath - wrapperTableInfo.setStorePath(storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, - carbonTable) - } - } - }) - } - }) - } else { - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) - } - } catch { - case s: java.io.FileNotFoundException => - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) - } - } - - /** - * This method will read the schema file from a given path - * - * @param schemaFilePath - * @return - */ - def readSchemaFile(schemaFilePath: String): TableInfo = { - val createTBase = new ThriftReader.TBaseCreator() { - override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = { - new TableInfo() - } - } - val thriftReader = new ThriftReader(schemaFilePath, createTBase) - thriftReader.open() - val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo] - thriftReader.close() - tableInfo - } - - /** - * This method will overwrite the existing schema and update it with the given details - * - * @param carbonTableIdentifier - * @param thriftTableInfo - * @param schemaEvolutionEntry - * @param carbonStorePath - * @param sparkSession - */ - def updateTableSchema(carbonTableIdentifier: CarbonTableIdentifier, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - schemaEvolutionEntry: SchemaEvolutionEntry, - carbonStorePath: String) - (sparkSession: SparkSession): String = { - val schemaConverter = new ThriftWrapperSchemaConverterImpl - thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName, - carbonStorePath) - createSchemaThriftFile(wrapperTableInfo, - thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName)(sparkSession) - } - - /** - * This method will is used to remove the evolution entry in case of failure. - * - * @param carbonTableIdentifier - * @param thriftTableInfo - * @param carbonStorePath - * @param sparkSession - */ - def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - carbonStorePath: String) - (sparkSession: SparkSession): String = { - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName, - carbonStorePath) - val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history - evolutionEntries.remove(evolutionEntries.size() - 1) - createSchemaThriftFile(wrapperTableInfo, - thriftTableInfo, - carbonTableIdentifier.getDatabaseName, - carbonTableIdentifier.getTableName)(sparkSession) - } - - - - /** - * - * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. - * Load CarbonTable from wrapper tableInfo - * - */ - def createTableFromThrift( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - dbName: String, tableName: String) - (sparkSession: SparkSession): String = { - if (tableExists(tableName, Some(dbName))(sparkSession)) { - sys.error(s"Table [$tableName] already exists under Database [$dbName]") - } - val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val thriftTableInfo = schemaConverter - .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) - thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history - .add(schemaEvolutionEntry) - val carbonTablePath = createSchemaThriftFile(tableInfo, - thriftTableInfo, - dbName, - tableName)(sparkSession) - LOGGER.info(s"Table $tableName for Database $dbName created successfully.") - carbonTablePath - } - - /** - * This method will write the schema thrift file in carbon store and load table metadata - * - * @param tableInfo - * @param thriftTableInfo - * @param dbName - * @param tableName - * @param sparkSession - * @return - */ - private def createSchemaThriftFile( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - thriftTableInfo: org.apache.carbondata.format.TableInfo, - dbName: String, tableName: String) - (sparkSession: SparkSession): String = { - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, - tableInfo.getFactTable.getTableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) - val schemaFilePath = carbonTablePath.getSchemaFilePath - val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) - tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(storePath) - val fileType = FileFactory.getFileType(schemaMetadataPath) - if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { - FileFactory.mkdirs(schemaMetadataPath, fileType) - } - val thriftWriter = new ThriftWriter(schemaFilePath, false) - thriftWriter.open(FileWriteOperation.OVERWRITE) - thriftWriter.write(thriftTableInfo) - thriftWriter.close() - removeTableFromMetadata(dbName, tableName) - CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(carbonTableIdentifier, storePath, - CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)) - metadata.tablesMeta += tableMeta - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - carbonTablePath.getPath - } - - /** - * This method will remove the table meta from catalog metadata array - * - * @param dbName - * @param tableName - */ - def removeTableFromMetadata(dbName: String, tableName: String): Unit = { - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, tableName) - metadataToBeRemoved match { - case Some(tableMeta) => - metadata.tablesMeta -= tableMeta - CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) - case None => - LOGGER.debug(s"No entry for table $tableName in database $dbName") - } - } - - private def updateMetadataByWrapperTable( - wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = { - - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable( - wrapperTableInfo.getTableUniqueName) - for (i <- metadata.tablesMeta.indices) { - if (wrapperTableInfo.getTableUniqueName.equals( - metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) { - metadata.tablesMeta(i).carbonTable = carbonTable - } - } - } - - def updateMetadataByThriftTable(schemaFilePath: String, - tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = { - - tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - .setTime_stamp(System.currentTimeMillis()) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - wrapperTableInfo.setStorePath(storePath) - updateMetadataByWrapperTable(wrapperTableInfo) - } - - /** - * Shows all schemas which has Database name like - */ - def showDatabases(schemaLike: Option[String]): Seq[String] = { - checkSchemasModifiedTimeAndReloadTables() - metadata.tablesMeta.map { c => - schemaLike match { - case Some(name) => - if (c.carbonTableIdentifier.getDatabaseName.contains(name)) { - c.carbonTableIdentifier - .getDatabaseName - } else { - null - } - case _ => c.carbonTableIdentifier.getDatabaseName - } - }.filter(f => f != null) - } - - /** - * Shows all tables in all schemas. - */ - def getAllTables(): Seq[TableIdentifier] = { - checkSchemasModifiedTimeAndReloadTables() - metadata.tablesMeta.map { c => - TableIdentifier(c.carbonTableIdentifier.getTableName, - Some(c.carbonTableIdentifier.getDatabaseName)) - } - } - - def isTablePathExists(tableIdentifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { - val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - val tableName = tableIdentifier.table.toLowerCase - - val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath, - new CarbonTableIdentifier(dbName, tableName, "")).getPath - - val fileType = FileFactory.getFileType(tablePath) - FileFactory.isFileExist(tablePath, fileType) - } - - def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) - (sparkSession: SparkSession) { - val dbName = tableIdentifier.database.get - val tableName = tableIdentifier.table - - val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, - new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath - - val fileType = FileFactory.getFileType(metadataFilePath) - - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - // while drop we should refresh the schema modified time so that if any thing has changed - // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTables - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, - tableIdentifier.table) - metadataToBeRemoved match { - case Some(tableMeta) => - metadata.tablesMeta -= tableMeta - CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - case None => - LOGGER.info(s"Metadata does not contain entry for table $tableName in database $dbName") - } - CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) - // discard cached table info in cachedDataSourceTables - sparkSession.sessionState.catalog.refreshTable(tableIdentifier) - } - } - - private def getTimestampFileAndType(databaseName: String, tableName: String) = { - val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE - val timestampFileType = FileFactory.getFileType(timestampFile) - (timestampFile, timestampFileType) - } - - /** - * This method will put the updated timestamp of schema file in the table modified time store map - * - * @param timeStamp - */ - def updateSchemasUpdatedTime(timeStamp: Long) { - tableModifiedTimeStore.put("default", timeStamp) - } - - /** - * This method will read the timestamp of empty schema file - * - * @param databaseName - * @param tableName - * @return - */ - def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime - } else { - System.currentTimeMillis() - } - } - - /** - * This method will check and create an empty schema timestamp file - * - * @param databaseName - * @param tableName - * @return - */ - def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName") - FileFactory.createNewFile(timestampFile, timestampFileType) - } - val systemTime = System.currentTimeMillis() - FileFactory.getCarbonFile(timestampFile, timestampFileType) - .setLastModifiedTime(systemTime) - systemTime - } - - def checkSchemasModifiedTimeAndReloadTables() { - val (timestampFile, timestampFileType) = getTimestampFileAndType("", "") - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). - getLastModifiedTime == - tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) { - refreshCache() - } - } - } - - def refreshCache() { - metadata.tablesMeta = loadMetadata(storePath, nextQueryId).tablesMeta - } - - def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = { - var schemaLastUpdatedTime = System.currentTimeMillis - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType) - .getLastModifiedTime - } - schemaLastUpdatedTime - } - - def readTableMetaDataFile(tableFolder: CarbonFile, - fileType: FileFactory.FileType): - (String, String, String, String, Partitioner, Long) = { - val tableMetadataFile = tableFolder.getAbsolutePath + "/metadata" - - var schema: String = "" - var databaseName: String = "" - var tableName: String = "" - var dataPath: String = "" - var partitioner: Partitioner = null - val cal = new GregorianCalendar(2011, 1, 1) - var tableCreationTime = cal.getTime.getTime - - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - // load metadata - val in = FileFactory.getDataInputStream(tableMetadataFile, fileType) - var len = 0 - try { - len = in.readInt() - } catch { - case others: EOFException => len = 0 - } - - while (len > 0) { - val databaseNameBytes = new Array[Byte](len) - in.readFully(databaseNameBytes) - - databaseName = new String(databaseNameBytes, "UTF8") - val tableNameLen = in.readInt() - val tableNameBytes = new Array[Byte](tableNameLen) - in.readFully(tableNameBytes) - tableName = new String(tableNameBytes, "UTF8") - - val dataPathLen = in.readInt() - val dataPathBytes = new Array[Byte](dataPathLen) - in.readFully(dataPathBytes) - dataPath = new String(dataPathBytes, "UTF8") - - val versionLength = in.readInt() - val versionBytes = new Array[Byte](versionLength) - in.readFully(versionBytes) - - val schemaLen = in.readInt() - val schemaBytes = new Array[Byte](schemaLen) - in.readFully(schemaBytes) - schema = new String(schemaBytes, "UTF8") - - val partitionLength = in.readInt() - val partitionBytes = new Array[Byte](partitionLength) - in.readFully(partitionBytes) - val inStream = new ByteArrayInputStream(partitionBytes) - val objStream = new ObjectInputStream(inStream) - partitioner = objStream.readObject().asInstanceOf[Partitioner] - objStream.close() - - try { - tableCreationTime = in.readLong() - len = in.readInt() - } catch { - case others: EOFException => len = 0 - } - - } - in.close() - } - - (databaseName, tableName, dataPath, schema, partitioner, tableCreationTime) - } - - def createDatabaseDirectory(dbName: String) { - val databasePath = storePath + File.separator + dbName.toLowerCase - val fileType = FileFactory.getFileType(databasePath) - FileFactory.mkdirs(databasePath, fileType) - } - - def dropDatabaseDirectory(dbName: String) { - val databasePath = storePath + File.separator + dbName - val fileType = FileFactory.getFileType(databasePath) - if (FileFactory.isFileExist(databasePath, fileType)) { - val dbPath = FileFactory.getCarbonFile(databasePath, fileType) - CarbonUtil.deleteFoldersAndFiles(dbPath) - } - } - -} - - -object CarbonMetastoreTypes extends RegexParsers { - protected lazy val primitiveType: Parser[DataType] = - "string" ^^^ StringType | - "float" ^^^ FloatType | - "int" ^^^ IntegerType | - "tinyint" ^^^ ShortType | - "short" ^^^ ShortType | - "double" ^^^ DoubleType | - "long" ^^^ LongType | - "binary" ^^^ BinaryType | - "boolean" ^^^ BooleanType | - fixedDecimalType | - "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) | - "varchar\\((\\d+)\\)".r ^^^ StringType | - "date" ^^^ DateType | - "timestamp" ^^^ TimestampType - - protected lazy val fixedDecimalType: Parser[DataType] = - "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ { - case precision ~ scale => - DecimalType(precision.toInt, scale.toInt) - } - - protected lazy val arrayType: Parser[DataType] = - "array" ~> "<" ~> dataType <~ ">" ^^ { - case tpe => ArrayType(tpe) - } - - protected lazy val mapType: Parser[DataType] = - "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { - case t1 ~ _ ~ t2 => MapType(t1, t2) - } - - protected lazy val structField: Parser[StructField] = - "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { - case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) - } - - protected lazy val structType: Parser[DataType] = - "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ { - case fields => StructType(fields) - } - - protected lazy val dataType: Parser[DataType] = - arrayType | - mapType | - structType | - primitiveType - - def toDataType(metastoreType: String): DataType = { - parseAll(dataType, metastoreType) match { - case Success(result, _) => result - case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType") - } - } - - def toMetastoreType(dt: DataType): String = { - dt match { - case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>" - case StructType(fields) => - s"struct<${ - fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }") - .mkString(",") - }>" - case StringType => "string" - case FloatType => "float" - case IntegerType => "int" - case ShortType => "tinyint" - case DoubleType => "double" - case LongType => "bigint" - case BinaryType => "binary" - case BooleanType => "boolean" - case DecimalType() => "decimal" - case TimestampType => "timestamp" - case DateType => "date" - } - } -} - - -/** - * Represents logical plan for one carbon table - */ -case class CarbonRelation( - databaseName: String, - tableName: String, - var metaData: CarbonMetaData, - tableMeta: TableMeta, - alias: Option[String]) - extends LeafNode with MultiInstanceRelation { - - def recursiveMethod(dimName: String, childDim: CarbonDimension): String = { - childDim.getDataType.toString.toLowerCase match { - case "array" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:struct<${ getStructChildren(childDim.getColName) }>" - case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }" - } - } - - def getArrayChildren(dimName: String): String = { - metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { - childDim.getDataType.toString.toLowerCase match { - case "array" => s"array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>" - case dType => addDecimalScaleAndPrecision(childDim, dType) - } - }).mkString(",") - } - - def getStructChildren(dimName: String): String = { - metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { - childDim.getDataType.toString.toLowerCase match { - case "array" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:struct<${ metaData.carbonTable.getChildren(childDim.getColName) - .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",") - }>" - case dType => s"${ childDim.getColName - .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }" - } - }).mkString(",") - } - - override def newInstance(): LogicalPlan = { - CarbonRelation(databaseName, tableName, metaData, tableMeta, alias) - .asInstanceOf[this.type] - } - - val dimensionsAttr = { - val sett = new LinkedHashSet( - tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName) - .asScala.asJava) - sett.asScala.toSeq.map(dim => { - val dimval = metaData.carbonTable - .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName) - val output: DataType = dimval.getDataType - .toString.toLowerCase match { - case "array" => - CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>") - case "struct" => - CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>") - case dType => - val dataType = addDecimalScaleAndPrecision(dimval, dType) - CarbonMetastoreTypes.toDataType(dataType) - } - - AttributeReference( - dim.getColName, - output, - nullable = true)() - }) - } - - val measureAttr = { - val factTable = tableMeta.carbonTable.getFactTableName - new LinkedHashSet( - tableMeta.carbonTable. - getMeasureByTableName(tableMeta.carbonTable.getFactTableName). - asScala.asJava).asScala.toSeq - .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType( - metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString - .toLowerCase match { - case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")" - case others => others - }), - nullable = true)()) - } - - override val output = { - val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) - .asScala - // convert each column to Attribute - columns.filter(!_.isInvisible).map { column => - if (column.isDimension()) { - val output: DataType = column.getDataType.toString.toLowerCase match { - case "array" => - CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") - case "struct" => - CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") - case dType => - val dataType = addDecimalScaleAndPrecision(column, dType) - CarbonMetastoreTypes.toDataType(dataType) - } - AttributeReference(column.getColName, output, nullable = true )( - qualifier = Option(tableName + "." + column.getColName)) - } else { - val output = CarbonMetastoreTypes.toDataType { - column.getDataType.toString - .toLowerCase match { - case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column - .getColumnSchema.getScale + ")" - case others => others - } - } - AttributeReference(column.getColName, output, nullable = true)( - qualifier = Option(tableName + "." + column.getColName)) - } - } - } - - def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { - var dType = dataType - if (dimval.getDataType == DECIMAL) { - dType += - "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" - } - dType - } - - // TODO: Use data from the footers. - override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) - - override def equals(other: Any): Boolean = { - other match { - case p: CarbonRelation => - p.databaseName == databaseName && p.output == output && p.tableName == tableName - case _ => false - } - } - - def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = { - var dType = dataType - if (dimval.getDataType == DECIMAL) { - dType += - "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" - } - dType - } - - private var tableStatusLastUpdateTime = 0L - - private var sizeInBytesLocalValue = 0L - - def sizeInBytes: Long = { - val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime( - tableMeta.carbonTable.getAbsoluteTableIdentifier) - - if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { - val tablePath = CarbonStorePath.getCarbonTablePath( - tableMeta.storePath, - tableMeta.carbonTableIdentifier).getPath - val fileType = FileFactory.getFileType(tablePath) - if(FileFactory.isFileExist(tablePath, fileType)) { - tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime - sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath) - } - } - sizeInBytesLocalValue - } - -}