geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject incubator-geode git commit: GEODE-1800 StoppableCondition has faulty code in await()
Date Fri, 19 Aug 2016 21:31:36 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 1ab59ffb2 -> 88b561d64


GEODE-1800 StoppableCondition has faulty code in await()

The faulty methods in StoppableCondition have been removed.  I considered
removing StoppableCondition entirely and just using a Condition on the lock
wrapped by the Stoppable lock class but doing so would require additional work
to make the CancelCriterion available to the code that uses these
StoppableConditions, so other than removing the methods I've left the class
alone.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/88b561d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/88b561d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/88b561d6

Branch: refs/heads/develop
Commit: 88b561d64dc328ca2cf9d36ea4a94bc58d80ff19
Parents: 1ab59ff
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Fri Aug 19 14:29:54 2016 -0700
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Fri Aug 19 14:31:16 2016 -0700

----------------------------------------------------------------------
 .../internal/InternalDistributedSystem.java     | 13 -----
 .../internal/locks/GrantorRequestProcessor.java |  5 +-
 .../internal/cache/ha/HARegionQueue.java        |  8 +--
 .../parallel/ParallelGatewaySenderQueue.java    |  2 +-
 .../util/concurrent/StoppableCondition.java     | 58 ++++----------------
 5 files changed, 20 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
index 49a4c97..b9566df 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@ -422,9 +422,6 @@ public class InternalDistributedSystem
     this.startTime = System.currentTimeMillis();
     this.grc = new GrantorRequestProcessor.GrantorRequestContext(stopper);
     
-    elderLock = new StoppableReentrantLock(stopper);
-    elderLockCondition = elderLock.newCondition();
-
     this.creationStack = TEST_CREATION_STACK_GENERATOR.get().generateCreationStack(this.originalConfig);
 
 //    if (DistributionConfigImpl.multicastTest) {
@@ -1504,16 +1501,6 @@ public class InternalDistributedSystem
     return sb.toString();
   }
 
-  private final StoppableReentrantLock elderLock;
-  private final StoppableCondition elderLockCondition;
-  
-  public StoppableReentrantLock getElderLock() {
-    return elderLock;
-  }
-  public StoppableCondition getElderLockCondition() {
-    return elderLockCondition;
-  }
-  
   /**
    * Returns the current configuration of this distributed system.
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java
index e22a8c0..5a6b867 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/locks/GrantorRequestProcessor.java
@@ -21,6 +21,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
 
 import org.apache.logging.log4j.Logger;
 
@@ -210,8 +212,7 @@ public class GrantorRequestProcessor extends ReplyProcessor21 {
       logger.info(LogMarker.DLS, message);
       boolean interrupted = Thread.interrupted();
       try {
-        grc.elderLockCondition.await();
-        break;
+        grc.elderLockCondition.await(sys.getConfig().getMemberTimeout());
       } catch (InterruptedException ignore) {
         interrupted = true;
         sys.getCancelCriterion().checkCancelInProgress(ignore);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
index 6ac56ff..2b8934e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
@@ -18,6 +18,7 @@ package com.gemstone.gemfire.internal.cache.ha;
 
 import com.gemstone.gemfire.*;
 import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.TimeoutException;
 import com.gemstone.gemfire.cache.query.internal.CqQueryVsdStats;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
@@ -48,11 +49,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.*;
 
 /**
  * An implementation of Queue using Gemfire Region as the underlying
@@ -2539,7 +2539,7 @@ public class HARegionQueue implements RegionQueue
         region.getCache().getCancelCriterion().checkCancelInProgress(null);
         boolean interrupted = Thread.currentThread().isInterrupted();
         try {
-           blockCond.await();
+          blockCond.await(StoppableCondition.TIME_TO_WAIT);
         }
         catch (InterruptedException ie) {
           interrupted = true;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 1b5c11f..444a493 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1728,7 +1728,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             try {
               boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
               while (regionToDispatchedKeysMap.isEmpty()) {
-                regionToDispatchedKeysMapEmpty.await();
+                regionToDispatchedKeysMapEmpty.await(StoppableCondition.TIME_TO_WAIT);
               }
               if (wasEmpty) continue;
               // TODO: This should be optimized.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/88b561d6/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java
index 5c33498..04b958a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/StoppableCondition.java
@@ -27,6 +27,8 @@
 
 package com.gemstone.gemfire.internal.util.concurrent;
 
+import static com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch.RETRY_TIME;
+
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -36,25 +38,19 @@ import com.gemstone.gemfire.internal.Assert;
 
 /**
  * This class is functionally equivalent to {@link java.util.concurrent.locks.Condition};
- * however, it does not implement the interface, in an attempt to encourage
- * GemFire API writers to refer to this "stoppable" version instead.
- * <p>
- * It is implemented as a strict "cover" for a genuine {@link java.util.concurrent.locks.Condition}.
- * 
+ * however it only implements the acquire(long) method.  Its purpose is to perform a
+ * cancellation check
  */
-public class StoppableCondition implements /* Condition, */ java.io.Serializable {
-    private static final long serialVersionUID = -7091681525970431937L;
+public class StoppableCondition implements java.io.Serializable {
+  private static final long serialVersionUID = -7091681525970431937L;
+
+  /** The underlying condition **/
+  private final Condition condition;
 
-    /** The underlying condition **/
-    private final Condition condition;
-    
-    /** The cancellation object */
-    private final CancelCriterion stopper;
+  /** The cancellation object */
+  private final CancelCriterion stopper;
 
-  /**
-   * This is how often waiters will wake up to check for cancellation
-   */
-  private static final long RETRY_TIME = 15 * 1000; // milliseconds
+  public static final long TIME_TO_WAIT = 15000;
 
     /**
      * Create a new StoppableCondition based on given condition and
@@ -67,41 +63,11 @@ public class StoppableCondition implements /* Condition, */ java.io.Serializable
         this.stopper = stopper;
     }
 
-    public void awaitUninterruptibly() {
-      for (;;) {
-        boolean interrupted = Thread.interrupted();
-        try {
-          await();
-          break;
-        }
-        catch (InterruptedException e) {
-          interrupted = true;
-        }
-        finally {
-          if (interrupted) Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-    public void await() throws InterruptedException {
-      if (Thread.interrupted()) throw new InterruptedException();
-      for (;;) {
-        stopper.checkCancelInProgress(null);
-        if (await(RETRY_TIME))
-          break;
-      }
-    }
-
     public boolean await(long timeoutMs) throws InterruptedException {
         stopper.checkCancelInProgress(null);
         return condition.await(timeoutMs, TimeUnit.MILLISECONDS);
     }
 
-    public boolean awaitUntil(Date deadline) throws InterruptedException {
-      stopper.checkCancelInProgress(null);
-      return condition.awaitUntil(deadline);
-    }
-
     public synchronized void signal() {
       condition.signal();
     }


Mime
View raw message