carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-2986] Table Properties are lost when multiple driver concurrently
Date Thu, 04 Oct 2018 12:35:25 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 11bd0ade9 -> 3edea12a8


[CARBONDATA-2986] Table Properties are lost when multiple driver concurrently

Issue :- When concurrently multiple driver is creating table , for same table table properties
are lost .

Root Cause :-Schema file is getting overwritten from CarbonRelation#createTableIfNotExists,because
lookup of table is failed . this is happpened because concurrenly .mdt file is updated and
current table is removed from cache org.apache.spark.sql.hive.CarbonFileMetastore#checkSchemasModifiedTimeAndReloadTable

Solution :- Since carbon table is already created and Schema file is already written so no
need to do lookup again .

This closes #2785


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3edea12a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3edea12a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3edea12a

Branch: refs/heads/master
Commit: 3edea12a83e70dddb1eca271bf5660f73de272f5
Parents: 11bd0ad
Author: BJangir <babulaljangir111@gmail.com>
Authored: Fri Sep 28 17:17:30 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Oct 4 18:05:06 2018 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonSource.scala  | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3edea12a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 16cee96..cd1087d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -57,6 +57,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   with SchemaRelationProvider with StreamSinkProvider with DataSourceRegister {
 
   override def shortName(): String = "carbondata"
+  private val LOGGER = LogServiceFactory.getLogService(CarbonSource.getClass.getName)
 
   // will be called if hive supported create table command is provided
   override def createRelation(sqlContext: SQLContext,
@@ -143,7 +144,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       .exists(_.table.equalsIgnoreCase(tableName))) {
         getPathForTable(sqlContext.sparkSession, dbName, tableName, newParameters)
     } else {
-        createTableIfNotExists(sqlContext.sparkSession, newParameters, dataSchema)
+      createTableIfNotExists(sqlContext.sparkSession, dbName, tableName, newParameters, dataSchema)
     }
 
     CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), updatedParams,
@@ -160,6 +161,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
   private def createTableIfNotExists(
       sparkSession: SparkSession,
+      dbName: String,
+      tableName: String,
       parameters: Map[String, String],
       dataSchema: StructType): (String, Map[String, String]) = {
 
@@ -167,10 +170,18 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
 
     try {
-      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-      (carbonTable.getTablePath, parameters)
+      if (!(parameters.contains("carbonSchemaPartsNo")
+        || parameters.contains("carbonschemapartsno"))) {
+        val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+        (carbonTable.getTablePath, parameters)
+      } else {
+        (getPathForTable(sparkSession, dbName, tableName, parameters))
+      }
+
     } catch {
       case _: NoSuchTableException =>
+        LOGGER.warn("Carbon Table [" +dbName +"] [" +tableName +"] is not found, " +
+          "Now existing Schema will be overwritten with default properties")
         val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
         val identifier = AbsoluteTableIdentifier.from(
           CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),


Mime
View raw message