ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject ignite git commit: IGNITE-3515: NullPointerException when stopping IgniteSemaphore and no method has been called previously to initialize semaphore with initializeSemaphore(). Reviewed by Denis Magda.
Date Fri, 22 Jul 2016 11:53:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/master a989f04ec -> ab4963a68


IGNITE-3515: NullPointerException when stopping IgniteSemaphore and no method has been called
previously to initialize semaphore with initializeSemaphore().
Reviewed by Denis Magda.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ab4963a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ab4963a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ab4963a6

Branch: refs/heads/master
Commit: ab4963a686ea7ca560604079c52b8939b605cfcf
Parents: a989f04
Author: Krome Plasma <remi.malnar@some.invalid.address>
Authored: Fri Jul 22 14:53:39 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Fri Jul 22 14:53:39 2016 +0300

----------------------------------------------------------------------
 .../datastructures/GridCacheSemaphoreImpl.java  | 108 ++++++++++++-------
 .../IgniteSemaphoreAbstractSelfTest.java        |  25 ++++-
 2 files changed, 94 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ab4963a6/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 8f196be..a11c79d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -51,11 +52,11 @@ import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
- * Cache semaphore implementation based on AbstractQueuedSynchronizer.
- * Current implementation supports only unfair semaphores.
- * If any node fails after acquiring permissions on cache semaphore, there are two different
behaviors controlled with the
- * parameter failoverSafe. If this parameter is true, other nodes can reacquire permits that
were acquired by the failing node.
- * In case this parameter is false, IgniteInterruptedException is called on every node waiting
on this semaphore.
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer. Current implementation
supports only unfair
+ * semaphores. If any node fails after acquiring permissions on cache semaphore, there are
two different behaviors
+ * controlled with the parameter failoverSafe. If this parameter is true, other nodes can
reacquire permits that were
+ * acquired by the failing node. In case this parameter is false, IgniteInterruptedException
is called on every node
+ * waiting on this semaphore.
  */
 public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable
{
     /** */
@@ -104,8 +105,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
     }
 
     /**
-     * Synchronization implementation for semaphore.
-     * Uses AQS state to represent permits.
+     * Synchronization implementation for semaphore. Uses AQS state to represent permits.
      */
     final class Sync extends AbstractQueuedSynchronizer {
         private static final long serialVersionUID = 1192457210091910933L;
@@ -155,10 +155,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
          * Get number of permits for node.
          *
          * @param nodeID Node ID.
-         * @return Number of permits node has acquired at this semaphore. Can be less than
0 if
-         * more permits were released than acquired on node.
+         * @return Number of permits node has acquired at this semaphore. Can be less than
0 if more permits were
+         * released than acquired on node.
          */
-        public int getPermitsForNode(UUID nodeID){
+        public int getPermitsForNode(UUID nodeID) {
             return nodeMap.containsKey(nodeID) ? nodeMap.get(nodeID) : 0;
         }
 
@@ -182,12 +182,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
         }
 
         /**
-         * Set a flag indicating that it is not safe to continue using this semaphore.
-         * This is the case only if one of two things happened:
-         * 1. A node that previously acquired on this semaphore failed and
-         * semaphore is created in non-failoversafe mode;
-         * 2. Local node failed (is closed), so any any threads on this node
-         * waiting to acquire are notified, and semaphore is not safe to be used anymore.
+         * Set a flag indicating that it is not safe to continue using this semaphore. This
is the case only if one of
+         * two things happened: 1. A node that previously acquired on this semaphore failed
and semaphore is created in
+         * non-failoversafe mode; 2. Local node failed (is closed), so any any threads on
this node waiting to acquire
+         * are notified, and semaphore is not safe to be used anymore.
          *
          * @return True is semaphore is not safe to be used anymore.
          */
@@ -195,11 +193,12 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
             return broken;
         }
 
-        /** Flag indicating that a node failed and it is not safe to continue using this
semaphore.
-         * Any attempt to acquire on broken semaphore will result in {@linkplain IgniteInterruptedException}.
+        /**
+         * Flag indicating that a node failed and it is not safe to continue using this semaphore.
Any attempt to
+         * acquire on broken semaphore will result in {@linkplain IgniteInterruptedException}.
          *
          * @param broken True if semaphore should not be used anymore.
-         * */
+         */
         protected void setBroken(boolean broken) {
             this.broken = broken;
         }
@@ -211,9 +210,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
          * @return Negative number if thread should block, positive if thread successfully
acquires permits.
          */
         final int nonfairTryAcquireShared(int acquires) {
-            for (;;) {
+            for (; ; ) {
                 // If broken, return immediately, exception will be thrown anyway.
-                if(broken)
+                if (broken)
                     return 1;
 
                 int available = getState();
@@ -238,9 +237,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
             if (releases == 0)
                 return true;
 
-            for (;;) {
+            for (; ; ) {
                 // If broken, return immediately, exception will be thrown anyway.
-                if(broken)
+                if (broken)
                     return true;
 
                 int cur = getState();
@@ -261,9 +260,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
          * @return Number of permits to drain.
          */
         final int drainPermits() {
-            for (;;) {
+            for (; ; ) {
                 // If broken, return immediately, exception will be thrown anyway.
-                if(broken)
+                if (broken)
                     return 1;
 
                 int current = getState();
@@ -288,7 +287,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
                         @Override public Boolean call() throws Exception {
                             try (IgniteInternalTx tx = CU.txStartInternal(ctx,
                                 semView,
-                                    PESSIMISTIC, REPEATABLE_READ)
+                                PESSIMISTIC, REPEATABLE_READ)
                             ) {
                                 GridCacheSemaphoreState val = semView.get(key);
 
@@ -304,11 +303,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
                                     if (!draining) {
                                         UUID nodeID = ctx.localNodeId();
 
-                                        Map<UUID,Integer> map = val.getWaiters();
+                                        Map<UUID, Integer> map = val.getWaiters();
 
                                         int waitingCnt = expVal - newVal;
 
-                                        if(map.containsKey(nodeID))
+                                        if (map.containsKey(nodeID))
                                             waitingCnt += map.get(nodeID);
 
                                         map.put(nodeID, waitingCnt);
@@ -370,9 +369,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
                                     throw new IgniteCheckedException("Failed to find semaphore
with given name: " +
                                         name);
 
-                                Map<UUID,Integer> map = val.getWaiters();
+                                Map<UUID, Integer> map = val.getWaiters();
 
-                                if(!map.containsKey(nodeId)){
+                                if (!map.containsKey(nodeId)) {
                                     tx.rollback();
 
                                     return false;
@@ -380,7 +379,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
 
                                 int numPermits = map.get(nodeId);
 
-                                if(numPermits > 0)
+                                if (numPermits > 0)
                                     val.setCount(val.getCount() + numPermits);
 
                                 map.remove(nodeId);
@@ -552,7 +551,42 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
         }
     }
 
+    /** {@inheritDoc} */
     @Override public void stop() {
+        if (initGuard.get()) {
+            try {
+                // Wait while initialization is in progress.
+                U.await(initLatch);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.error("Failed waiting while initialization is completed.", e);
+            }
+        }
+        else {
+            // Preventing concurrent initialization.
+            if (initGuard.compareAndSet(false, true)) {
+                initLatch.countDown();
+
+                if (log.isDebugEnabled())
+                    log.debug("Semaphore wasn't initialized. Prevented further initialization.");
+
+                return;
+            }
+            else {
+                try {
+                    // Wait while initialization is in progress.
+                    U.await(initLatch);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.error("Failed waiting while initialization is completed.", e);
+                }
+            }
+        }
+
+        assert sync != null;
+
         sync.setBroken(true);
 
         // Try to notify any waiting threads.
@@ -580,7 +614,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
 
             sync.acquireSharedInterruptibly(permits);
 
-            if(isBroken())
+            if (isBroken())
                 throw new InterruptedException();
         }
         catch (IgniteCheckedException e) {
@@ -697,7 +731,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
 
             boolean result = sync.nonfairTryAcquireShared(1) >= 0;
 
-            if(isBroken())
+            if (isBroken())
                 throw new InterruptedException();
 
             return result;
@@ -722,7 +756,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
 
             boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
 
-            if(isBroken())
+            if (isBroken())
                 throw new InterruptedException();
 
             return result;
@@ -789,9 +823,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
         try {
             initializeSemaphore();
 
-            boolean result =  sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+            boolean result = sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
 
-            if(isBroken())
+            if (isBroken())
                 throw new InterruptedException();
 
             return result;
@@ -859,7 +893,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx,
Exter
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isBroken(){
+    @Override public boolean isBroken() {
         ctx.kernalContext().gateway().readLock();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ab4963a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
index 200e276..5241dd1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
@@ -39,6 +40,7 @@ import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridStringLogger;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Rule;
@@ -234,6 +236,25 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
     /**
      * @throws Exception If failed.
      */
+    public void testSemaphoreClosing() throws Exception {
+        IgniteConfiguration cfg;
+        GridStringLogger stringLogger;
+
+        stringLogger = new GridStringLogger();
+
+        cfg = optimize(getConfiguration("npeGrid"));
+        cfg.setGridLogger(stringLogger);
+
+        try (Ignite ignite = startGrid(cfg.getGridName(), cfg)) {
+            ignite.semaphore("semaphore", 1, true, true);
+        }
+
+        assertFalse(stringLogger.toString().contains(NullPointerException.class.getName()));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void checkSemaphoreSerialization() throws Exception {
         final IgniteSemaphore sem = grid(0).semaphore("semaphore", -gridCount() + 1, true,
true);
 
@@ -276,8 +297,8 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
     }
 
     /**
-     * This method only checks if parameter of new semaphore is initialized properly.
-     * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest.
+     * This method only checks if parameter of new semaphore is initialized properly. For
tests considering failure
+     * recovery see
      *
      * @throws Exception Exception.
      */


Mime
View raw message