carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: Problem: 1. Whenever a catalog metadata is refreshed it modified the timestamp of modifiedTime.mdt file which leads to unnecessary refreshing the complete catalog metadata. 2. Array Index of bound exception is throw
Date Fri, 04 Nov 2016 09:36:59 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 01e642fd3 -> f53d323fa


Problem:
1. Whenever a catalog metadata is refreshed it modified the timestamp of modifiedTime.mdt
file which leads to unnecessary refreshing the complete catalog metadata.
2. Array Index of bound exception is thrown on failure of table creation.

Analysis:
1. Whenever carbon environment gets initialized it loads the table metadata in the catalog
and changes the timestamp of modifiedTime.mdt file. If a parallel beeline session is in progress
then it will cause unnecessary refreshing of the catalog metadata.
2. For the very first time if table creation fails then in the exception block it tries to
drop that table and clear its metadata. In drop table filter API is used which throws array
index out of bound exception if metadata array is empty.

Fix:
1. No need to update the timestamp of modifiedTime.mdt file while loading metadata. It should
only be refreshed on create and drop table operations.
2. Instead of filter API use find API which will return an Option object.

Impact: carbon catalog refresh which will impact query and load flow.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/04d62b54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/04d62b54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/04d62b54

Branch: refs/heads/master
Commit: 04d62b54a52b563eab41c2f76c02802bd67aedd9
Parents: 01e642f
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Fri Nov 4 14:06:52 2016 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Fri Nov 4 14:38:18 2016 +0530

----------------------------------------------------------------------
 .../spark/sql/hive/CarbonMetastoreCatalog.scala | 110 +++++++++++++------
 1 file changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/04d62b54/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 75dc235..d551e10 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -131,18 +131,31 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath:
String,
       alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
     checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
-    val tables = metadata.tablesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
-    if (tables.nonEmpty) {
-      CarbonRelation(database, tableIdentifier.table,
-        CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
-    } else {
-      LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
-      throw new NoSuchTableException
+    val tables = getTableFromMetadata(database, tableIdentifier.table)
+    tables match {
+      case Some(t) =>
+        CarbonRelation(database, tableIdentifier.table,
+          CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext)
+      case None =>
+        LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
+        throw new NoSuchTableException
     }
   }
 
+  /**
+   * This method will search for a table in the catalog metadata
+   *
+   * @param database
+   * @param tableName
+   * @return
+   */
+  def getTableFromMetadata(database: String,
+      tableName: String): Option[TableMeta] = {
+    metadata.tablesMeta
+      .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+                 c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))
+  }
+
   def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
     checkSchemasModifiedTimeAndReloadTables()
     val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
@@ -178,7 +191,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath:
String,
     val fileType = FileFactory.getFileType(metadataPath)
     val metaDataBuffer = new ArrayBuffer[TableMeta]
     fillMetaData(metadataPath, fileType, metaDataBuffer)
-    updateSchemasUpdatedTime("", "")
+    updateSchemasUpdatedTime(readSchemaFileSystemTime("", ""))
     statistic.addStatistics(QueryStatisticsConstants.LOAD_META,
       System.currentTimeMillis())
     recorder.recordStatisticsForDriver(statistic, queryId)
@@ -314,7 +327,7 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath:
String,
     metadata.tablesMeta += tableMeta
     logInfo(s"Table $tableName for Database $dbName created successfully.")
     LOGGER.info("Table " + tableName + " for Database " + dbName + " created successfully.")
-    updateSchemasUpdatedTime(dbName, tableName)
+    updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
     carbonTablePath.getPath
   }
 
@@ -434,51 +447,78 @@ class CarbonMetastoreCatalog(hiveContext: HiveContext, val storePath:
String,
     val fileType = FileFactory.getFileType(metadataFilePath)
 
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-
+      // while drop we should refresh the schema modified time so that if any thing has changed
+      // in the other beeline need to update.
+      checkSchemasModifiedTimeAndReloadTables
       val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
       CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
-
-      metadata.tablesMeta -= metadata.tablesMeta.filter(
-        c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) &&
-             c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))(0)
-      org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-        .removeTable(dbName + "_" + tableName)
-
+      val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName,
+        tableIdentifier.table)
+      metadataToBeRemoved match {
+        case Some(tableMeta) =>
+          metadata.tablesMeta -= tableMeta
+          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+            .removeTable(dbName + "_" + tableName)
+          org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
+            .removeTable(dbName + "_" + tableName)
+          updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
+        case None =>
+          logInfo(s"Metadata does not contain entry for table $tableName in database $dbName")
+      }
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext)
-      updateSchemasUpdatedTime(dbName, tableName)
-
       // discard cached table info in cachedDataSourceTables
       sqlContext.catalog.refreshTable(tableIdentifier)
     }
   }
 
   private def getTimestampFileAndType(databaseName: String, tableName: String) = {
-
     val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
-
     val timestampFileType = FileFactory.getFileType(timestampFile)
     (timestampFile, timestampFileType)
   }
 
-  def updateSchemasUpdatedTime(databaseName: String, tableName: String) {
+  /**
+   * This method will put the updated timestamp of schema file in the table modified time
store map
+   *
+   * @param timeStamp
+   */
+  def updateSchemasUpdatedTime(timeStamp: Long) {
+    tableModifiedTimeStore.put("default", timeStamp)
+  }
+
+  /**
+   * This method will read the timestamp of empty schema file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
     val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
+      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
+    } else {
+      System.currentTimeMillis()
+    }
+  }
 
+  /**
+   * This method will check and create an empty schema timestamp file
+   *
+   * @param databaseName
+   * @param tableName
+   * @return
+   */
+  def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = {
+    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
     if (!FileFactory.isFileExist(timestampFile, timestampFileType)) {
       LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
-
-    touchSchemasTimestampFile(databaseName, tableName)
-
-    tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME,
-      FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime)
-
-  }
-
-  def touchSchemasTimestampFile(databaseName: String, tableName: String) {
-    val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName)
+    val systemTime = System.currentTimeMillis()
     FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .setLastModifiedTime(System.currentTimeMillis())
+      .setLastModifiedTime(systemTime)
+    systemTime
   }
 
   def checkSchemasModifiedTimeAndReloadTables() {


Mime
View raw message