carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [05/50] [abbrv] carbondata git commit: [CARBONDATA-1754][BugFix] Fixed issue occuring on concurrent insert-overwrite and compaction
Date Sun, 28 Jan 2018 06:45:34 GMT
[CARBONDATA-1754][BugFix] Fixed issue occuring on concurrent insert-overwrite and compaction

Description: Concurrent Insert overwrite-Compaction: Compaction job fails at run time if insert
overwrite job is running concurrently.

Problem: When Insert-overwrite and compaction are run from two different sessions, Insert
overwrite job is success but after that compaction fails with an exception saying 'Compaction
failed to update metadata for table'

Solution: Ideally compaction job should give an exception in start with message that 'insert
overwrite is in progress'.

This closes #1711


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11353e2d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11353e2d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11353e2d

Branch: refs/heads/fgdatamap
Commit: 11353e2d0475689a1220ee9022878c655e79f298
Parents: 705b111
Author: SangeetaGulia <sangeeta.gulia@knoldus.in>
Authored: Thu Dec 21 18:42:18 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Mon Jan 8 16:30:54 2018 +0800

----------------------------------------------------------------------
 .../iud/TestInsertOverwriteAndCompaction.scala  | 104 +++++++++++++++++++
 .../exception/ConcurrentOperationException.java |  44 ++++++++
 .../CarbonAlterTableCompactionCommand.scala     |  14 ++-
 3 files changed, 160 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
new file mode 100644
index 0000000..f69d1e7
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertOverwriteAndCompaction.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.iud
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestInsertOverwriteAndCompaction extends QueryTest with BeforeAndAfterAll {
+  var df: DataFrame = _
+  private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
+
+  override def beforeAll {
+    dropTable()
+    buildTestData()
+  }
+
+  override def afterAll {
+    executorService.shutdownNow()
+    dropTable()
+  }
+
+
+  private def buildTestData(): Unit = {
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    // Simulate data and write to table orders
+    import sqlContext.implicits._
+
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000)
+      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
+        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,"ordersTable"+value))
+      .toDF("o_id", "o_date", "o_country", "o_name",
+        "o_phonetype", "o_serialname", "o_salary","o_comment")
+    createTable("orders")
+    createTable("orders_overwrite")
+  }
+
+  private def dropTable() = {
+    sql("DROP TABLE IF EXISTS orders")
+    sql("DROP TABLE IF EXISTS orders_overwrite")
+  }
+
+  private def createTable(tableName: String): Unit ={
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "true")
+      .option("compress", "true")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
+
+  test("Concurrency test for Insert-Overwrite and compact") {
+    val tasks = new java.util.ArrayList[Callable[String]]()
+    tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite"))
+    tasks.add(new QueryTask("alter table orders compact 'MINOR'"))
+    val results: util.List[Future[String]] = executorService.invokeAll(tasks)
+    val resultList = new util.ArrayList[String]()
+    resultList.add(results.get(0).get)
+    resultList.add(results.get(1).get)
+    assert(resultList.contains("PASS"))
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        LOGGER.info("Executing :" + query + Thread.currentThread().getName)
+        sql(query).show()
+      } catch {
+        case _: Exception =>
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
new file mode 100644
index 0000000..1f3c07d
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.exception;
+
+public class ConcurrentOperationException extends Exception {
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public ConcurrentOperationException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11353e2d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 6daaae5..b10777d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -36,11 +36,13 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent,
AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -56,7 +58,7 @@ case class CarbonAlterTableCompactionCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService =
-    LogServiceFactory.getLogService(this.getClass.getName)
+      LogServiceFactory.getLogService(this.getClass.getName)
     val tableName = alterTableModel.tableName.toLowerCase
     val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
 
@@ -79,6 +81,14 @@ case class CarbonAlterTableCompactionCommand(
       relation.carbonTable
     }
 
+    val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
+    if (isLoadInProgress) {
+      val message = "Cannot run data loading and compaction on same table concurrently. "
+
+                    "Please wait for load to finish"
+      LOGGER.error(message)
+      throw new ConcurrentOperationException(message)
+    }
+
     val carbonLoadModel = new CarbonLoadModel()
     carbonLoadModel.setTableName(table.getTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
@@ -162,7 +172,7 @@ case class CarbonAlterTableCompactionCommand(
     }
 
     // reading the start time of data load.
-    val loadStartTime : Long =
+    val loadStartTime: Long =
       if (alterTableModel.factTimeStamp.isEmpty) {
         CarbonUpdateUtil.readCurrentTime
       } else {


Mime
View raw message