carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2317] Concurrent datamap with same name and schema creation throws exception
Date Wed, 11 Apr 2018 08:22:15 GMT
Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 3c48df396 -> cc2a74e65


[CARBONDATA-2317] Concurrent datamap with same name and schema creation throws exception

Concurrent datamap with same name and schema creation throws exception

This closes #2156


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cc2a74e6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cc2a74e6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cc2a74e6

Branch: refs/heads/branch-1.3
Commit: cc2a74e65001ee73da4fad47852984d4f1c42adf
Parents: 3c48df3
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Fri Apr 6 15:17:54 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Wed Apr 11 13:55:41 2018 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggCreateCommand.scala  | 43 ++++++++++++++++++++
 .../table/CarbonCreateTableCommand.scala        | 27 ++++++------
 2 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc2a74e6/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 5b7c310..5813211 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -17,7 +17,14 @@
 
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
+
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
 
 import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, Row}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -415,6 +422,42 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll
{
     sql("use default")
   }
 
+  test("test creation of multiple preaggregate of same name concurrently ") {
+    sql("DROP TABLE IF EXISTS tbl_concurr")
+    sql(
+      "create table if not exists  tbl_concurr(imei string,age int,mac string ,prodate timestamp,"
+
+      "update timestamp,gamepoint double,contrid double) stored by 'carbondata' ")
+
+    var executorService: ExecutorService = Executors.newCachedThreadPool()
+    val tasks = new util.ArrayList[Callable[String]]()
+    var i = 0
+    val count = 5
+    while (i < count) {
+      tasks
+        .add(new QueryTask(
+          s"""create datamap agg_concu1 on table tbl_concurr using
+             |'preaggregate' as select prodate, mac from tbl_concurr group by prodate,mac"""
+            .stripMargin))
+      i = i + 1
+    }
+    executorService.invokeAll(tasks)
+
+    checkExistence(sql("show tables"), true, "agg_concu1", "tbl_concurr")
+    executorService.shutdown()
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "SUCCESS"
+      try {
+        sql(query).collect()
+      } catch {
+        case exception: Exception => LOGGER.error(exception.getMessage)
+      }
+      result
+    }
+  }
+
   def getCarbontable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null
     plan.transform {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cc2a74e6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 22dab27..e05cda6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -119,19 +119,22 @@ case class CarbonCreateTableCommand(
           // isVisible property is added to hive table properties to differentiate between
main
           // table and datamaps(like preaggregate). It is false only for datamaps. This is
added
           // to improve the show tables performance when filtering the datamaps from main
tables
-          sparkSession.sql(
-            s"""CREATE TABLE $dbName.$tableName
-               |(${ rawSchema })
-               |USING org.apache.spark.sql.CarbonSource
-               |OPTIONS (
-               |  tableName "$tableName",
-               |  dbName "$dbName",
-               |  tablePath "$tablePath",
-               |  path "$tablePath",
-               |  isVisible "$isVisible"
-               |  $carbonSchemaString)
-               |  $partitionString
+          // synchronized to prevent concurrently creation of table with same name
+          CarbonCreateTableCommand.synchronized {
+            sparkSession.sql(
+              s"""CREATE TABLE $dbName.$tableName
+                 |(${ rawSchema })
+                 |USING org.apache.spark.sql.CarbonSource
+                 |OPTIONS (
+                 |  tableName "$tableName",
+                 |  dbName "$dbName",
+                 |  tablePath "$tablePath",
+                 |  path "$tablePath",
+                 |  isVisible "$isVisible"
+                 |  $carbonSchemaString)
+                 |  $partitionString
              """.stripMargin)
+          }
         } catch {
           case e: AnalysisException => throw e
           case e: Exception =>


Mime
View raw message