carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [03/50] [abbrv] carbondata git commit: [CARBONDATA-1928] Seperate the properties for timeout and retries for load flow
Date Sun, 07 Jan 2018 03:05:11 GMT
[CARBONDATA-1928] Seperate the properties for timeout and retries for load flow

Currently the property that is used to configure the lock retry count and the interval between
retries is common for all the locks. This will be problematic when the user has configured
the retries to 10/20 for concurrent loading. This property will be affecting other lock behaviours
also, all other locks would have to retry for 10 times too.

1. Change the name of the "carbon.load.metadata.lock.retries" property to "carbon.concurrent.lock.retries"
AND "carbon.concurrent.lock.retry.timeout.sec"

2. Introduce a new property for all other locks "carbon.lock.retries" AND "carbon.lock.retry.timeout.sec"

This closes #1708


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7dcc2e75
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7dcc2e75
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7dcc2e75

Branch: refs/heads/carbonstore
Commit: 7dcc2e75572b4a88cdef246d3b3a0770fb73b8d2
Parents: 8a295b5
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Wed Dec 20 22:13:10 2017 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Tue Dec 26 12:04:24 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 42 ++++++++++++++++----
 .../core/locks/AbstractCarbonLock.java          | 17 ++++++--
 .../carbondata/core/locks/CarbonLockUtil.java   | 13 ++++++
 .../carbondata/core/locks/ICarbonLock.java      |  7 ++++
 .../DataRetentionConcurrencyTestCase.scala      |  2 +-
 .../dataretention/DataRetentionTestCase.scala   |  2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 15 +++----
 .../processing/util/CarbonLoaderUtil.java       | 12 +++++-
 8 files changed, 86 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 71ab668..2021222 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -571,25 +571,53 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_TIMESTAMP_MILLIS = "dd-MM-yyyy HH:mm:ss:SSS";
   /**
-   * NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK
+   * NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK.
+   *
+   * Because we want concurrent loads to be completed even if they have to wait for the lock
+   * therefore taking the default as 100.
+   *
+   * Example: Concurrent loads will use this to wait to acquire the table status lock.
    */
-  public static final int NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT = 3;
+  public static final int NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT = 100;
   /**
    * MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK
+   *
+   * * Example: Concurrent loads will use this to wait to acquire the table status lock.
    */
-  public static final int MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT = 5;
+  public static final int MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT = 1;
   /**
    * NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK
    */
   @CarbonProperty
-  public static final String NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK =
-      "carbon.load.metadata.lock.retries";
+  public static final String NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK =
+      "carbon.concurrent.lock.retries";
   /**
    * MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK
    */
   @CarbonProperty
-  public static final String MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK =
-      "carbon.load.metadata.lock.retry.timeout.sec";
+  public static final String MAX_TIMEOUT_FOR_CONCURRENT_LOCK =
+      "carbon.concurrent.lock.retry.timeout.sec";
+
+  /**
+   * NUMBER_OF_TRIES_FOR_CARBON_LOCK
+   */
+  public static final int NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT = 3;
+  /**
+   * MAX_TIMEOUT_FOR_CARBON_LOCK
+   */
+  public static final int MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT = 5;
+  /**
+   * NUMBER_OF_TRIES_FOR_CARBON_LOCK
+   */
+  @CarbonProperty
+  public static final String NUMBER_OF_TRIES_FOR_CARBON_LOCK =
+      "carbon.lock.retries";
+  /**
+   * MAX_TIMEOUT_FOR_CARBON_LOCK
+   */
+  @CarbonProperty
+  public static final String MAX_TIMEOUT_FOR_CARBON_LOCK =
+      "carbon.lock.retry.timeout.sec";
 
   /**
    * compressor for writing/reading carbondata file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
index e927a7e..4aa0a18 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java
@@ -51,24 +51,33 @@ public abstract class AbstractCarbonLock implements ICarbonLock {
   }
 
   /**
+   * API for enabling the locking of file with retries.
+   */
+  public boolean lockWithRetries(int retries, int retryInterval) {
+    retryCount = retries;
+    retryTimeout = retryInterval;
+    return lockWithRetries();
+  }
+
+  /**
    * Initializes the retry count and retry timeout.
    * This will determine how many times to retry to acquire lock and the retry timeout.
    */
   protected void initRetry() {
     String retries = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK);
+        .getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK);
     try {
       retryCount = Integer.parseInt(retries);
     } catch (NumberFormatException e) {
-      retryCount = CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT;
+      retryCount = CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT;
     }
 
     String maxTimeout = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK);
+        .getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK);
     try {
       retryTimeout = Integer.parseInt(maxTimeout);
     } catch (NumberFormatException e) {
-      retryTimeout = CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT;
+      retryTimeout = CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index 711390b..1fcccfb 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.locks;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * This class contains all carbon lock utilities
@@ -92,4 +93,16 @@ public class CarbonLockUtil {
         "Acquire table lock failed after retry, please try after some time");
   }
 
+  /**
+   * Get the value for the property. If NumberFormatException is thrown then return default
value.
+   */
+  public static int getLockProperty(String property, int defaultValue) {
+    try {
+      return Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(property));
+    } catch (NumberFormatException e) {
+      return defaultValue;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
index 27862fd..e964f0c 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java
@@ -37,6 +37,13 @@ public interface ICarbonLock {
   boolean lockWithRetries();
 
   /**
+   * This will acquire the lock and if it doesnt get then it will retry after retryInterval.
+   *
+   * @return
+   */
+  boolean lockWithRetries(int retryCount, int retryInterval);
+
+  /**
    * This method will delete the lock file at the specified location.
    *
    * @param lockFile The path of the lock file.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala
index 40b3de0..78f4333 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala
@@ -34,7 +34,7 @@ class DataRetentionConcurrencyTestCase extends QueryTest with BeforeAndAfterAll
   private val executorService = Executors.newFixedThreadPool(10)
 
   override def beforeAll {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK,
"1")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
"1")
     sql("drop table if exists concurrent")
     sql(
       "create table concurrent (ID int, date String, country String, name " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index 99a729c..bde1f80 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -54,7 +54,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists DataRetentionTable")
     sql("drop table if exists retentionlock")
 
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK,
"1")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK,
"1")
     CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
"1")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"yyyy/MM/dd")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index d288122..32e8fb5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -571,8 +571,7 @@ object CarbonDataRDDFactory {
       dataFrame: Option[DataFrame],
       carbonLoadModel: CarbonLoadModel,
       updateModel: Option[UpdateTableModel],
-      carbonTable: CarbonTable
-  ): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
+      carbonTable: CarbonTable): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]
= {
     val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
 
     val updateRdd = dataFrame.get.rdd
@@ -632,8 +631,7 @@ object CarbonDataRDDFactory {
       updateModel: Option[UpdateTableModel],
       key: String,
       taskNo: Long,
-      iter: Iterator[Row]
-  ): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+      iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] =
{
     val rddResult = new updateResultImpl()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
@@ -718,8 +716,7 @@ object CarbonDataRDDFactory {
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       carbonTable: CarbonTable,
-      operationContext: OperationContext
-  ): Unit = {
+      operationContext: OperationContext): Unit = {
     LOGGER.info(s"compaction need status is" +
                 s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
     if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
@@ -796,8 +793,7 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       loadStatus: SegmentStatus,
       newEntryLoadStatus: SegmentStatus,
-      overwriteTable: Boolean
-  ): Boolean = {
+      overwriteTable: Boolean): Boolean = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val metadataDetails = if (status != null && status.size > 0 && status(0)
!= null) {
       status(0)._2._1
@@ -814,7 +810,8 @@ object CarbonDataRDDFactory {
     val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
       overwriteTable)
     if (!done) {
-      val errorMessage = "Dataload failed due to failure in table status updation."
+      val errorMessage = s"Dataload failed due to failure in table status updation for" +
+                         s" ${carbonLoadModel.getTableName}"
       LOGGER.audit("Data load is failed for " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
       LOGGER.error("Dataload failed due to failure in table status updation.")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7dcc2e75/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 49ca254..de59982 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -161,8 +162,14 @@ public final class CarbonLoaderUtil {
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    int retryCount = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+    int maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
     try {
-      if (carbonLock.lockWithRetries()) {
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
         LOGGER.info(
             "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
                 + " for table status updation");
@@ -379,7 +386,8 @@ public final class CarbonLoaderUtil {
     boolean entryAdded =
         CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite);
     if (!entryAdded) {
-      throw new IOException("Failed to add entry in table status for " + model.getTableName());
+      throw new IOException("Dataload failed due to failure in table status updation for
"
+          + model.getTableName());
     }
   }
 


Mime
View raw message