ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held.
Date Wed, 23 Dec 2015 07:03:56 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 118f29ae0 -> 70c182f8b


ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock
is held.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70c182f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70c182f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70c182f8

Branch: refs/heads/ignite-1537
Commit: 70c182f8be9c5a8aa6af3b3c6b501bb9ea19b78d
Parents: 118f29a
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 23 09:18:13 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 23 09:51:53 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 39 +++++++++++---------
 .../dht/atomic/GridDhtAtomicCache.java          |  1 +
 .../ignite/internal/util/lang/GridFunc.java     |  1 +
 ...niteClientReconnectFailoverAbstractTest.java |  3 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java | 20 +++++++---
 .../cache/GridCacheAbstractSelfTest.java        |  3 +-
 .../GridServiceProcessorStopSelfTest.java       | 21 ++++++-----
 7 files changed, 54 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bf7c7e4..42f8dae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -666,6 +666,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      *
      * @param plc Policy.
      * @return Execution pool.
+     * @throws IgniteCheckedException If failed.
      */
     private Executor pool(byte plc) throws IgniteCheckedException {
         switch (plc) {
@@ -767,6 +768,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Message.
      * @param plc Execution policy.
      * @param msgC Closure to call when message processing finished.
+     * @throws IgniteCheckedException If failed.
      */
     private void processRegularMessage(
         final UUID nodeId,
@@ -824,6 +826,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param msg Ordered message.
      * @param plc Execution policy.
      * @param msgC Closure to call when message processing finished ({@code null} for sync
processing).
+     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     private void processOrderedMessage(
@@ -1029,7 +1032,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param ordered Ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private void send(
@@ -1041,7 +1044,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         boolean ordered,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -1062,8 +1065,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             else
                 processRegularMessage0(ioMsg, locNodeId);
 
-            if (ackClosure != null)
-                ackClosure.apply(null);
+            if (ackC != null)
+                ackC.apply(null);
         }
         else {
             if (topicOrd < 0)
@@ -1071,7 +1074,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             try {
                 if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
-                    ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg,
ackClosure);
+                    ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg,
ackC);
                 else
                     getSpi().sendMessage(node, ioMsg);
             }
@@ -1197,12 +1200,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc,
-        IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException
{
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure);
+        IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1233,12 +1236,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException>
ackClosure)
+    public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException>
ackC)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false, ackClosure);
+        send(node, topic, -1, msg, plc, false, 0, false, ackC);
     }
 
     /**
@@ -1280,7 +1283,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendOrderedMessage(
@@ -1290,11 +1293,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
     }
 
      /**
@@ -1385,6 +1388,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to subscribe to.
      * @param p Message predicate.
      */
+    @SuppressWarnings("unchecked")
     public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID,
?> p) {
         if (p != null) {
             try {
@@ -1406,6 +1410,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param topic Topic to unsubscribe from.
      * @param p Message predicate.
      */
+    @SuppressWarnings("unchecked")
     public void removeUserMessageListener(@Nullable Object topic, IgniteBiPredicate<UUID,
?> p) {
         try {
             removeMessageListener(TOPIC_COMM_USER,
@@ -1423,7 +1428,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
-     * @param ackClosure Ack closure.
+     * @param ackC Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendOrderedMessage(
@@ -1433,7 +1438,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         long timeout,
         boolean skipOnTimeout,
-        IgniteInClosure<IgniteException> ackClosure
+        IgniteInClosure<IgniteException> ackC
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
@@ -1442,7 +1447,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node (has node left
grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6942d87..634a9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1292,6 +1292,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         IgniteTxManager tm = ctx.tm();
 
+                        // Needed for metadata cache transaction.
                         boolean set = tm.setTxTopologyHint(req.topologyVersion());
 
                         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 8d5a8e7..8eeca6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3408,6 +3408,7 @@ public class GridFunc {
      * @return First element in given collection for which predicate evaluates to
      *      {@code true} - or {@code null} if such element cannot be found.
      */
+    @SafeVarargs
     @Nullable public static <V> V find(Iterable<? extends V> c, @Nullable V dfltVal,
         @Nullable IgnitePredicate<? super V>... p) {
         A.notNull(c, "c");

http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
index f050c72..7e217b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -117,7 +117,8 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends
IgniteCl
                     }
 
                     return null;
-                } catch (Throwable e) {
+                }
+                catch (Throwable e) {
                     log.error("Unexpected error in operation thread: " + e, e);
 
                     stop.set(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 5b294cc..2c2ec3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -3277,9 +3277,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     public void testPeekExpired() throws Exception {
-        IgniteCache<String, Integer> c = jcache();
+        final IgniteCache<String, Integer> c = jcache();
 
-        String key = primaryKeysForCache(c, 1).get(0);
+        final String key = primaryKeysForCache(c, 1).get(0);
 
         info("Using key: " + key);
 
@@ -3295,6 +3295,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         Thread.sleep(ttl + 100);
 
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return peek(c, key) == null;
+            }
+        }, 2000);
+
         assert peek(c, key) == null;
 
         assert c.localSize() == 0 : "Cache is not empty.";
@@ -3307,9 +3313,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      */
     public void testPeekExpiredTx() throws Exception {
         if (txShouldBeUsed()) {
-            IgniteCache<String, Integer> c = jcache();
+            final IgniteCache<String, Integer> c = jcache();
 
-            String key = "1";
+            final String key = "1";
             int ttl = 500;
 
             try (Transaction tx = grid(0).transactions().txStart()) {
@@ -3320,7 +3326,11 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
                 tx.commit();
             }
 
-            Thread.sleep(ttl + 100);
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return c.localPeek(key, ONHEAP) == null;
+                }
+            }, 2000);
 
             assertNull(c.localPeek(key, ONHEAP));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 52fbf4c..b3d1384 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -416,9 +416,8 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest
{
      * @param cache Cache projection.
      * @param key Key.
      * @return Value.
-     * @throws Exception If failed.
      */
-    @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) throws
Exception {
+    @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) {
         return offheapTiered(cache) ? cache.localPeek(key, CachePeekMode.SWAP, CachePeekMode.OFFHEAP)
:
             cache.localPeek(key, CachePeekMode.ONHEAP);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70c182f8/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index dfea37a..92b18ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -17,15 +17,18 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -49,10 +52,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest
{
 
         final Ignite ignite = startGrid(0);
 
-        Thread t = new Thread(new Runnable() {
-            @Override public void run() {
-                Thread.currentThread().setName("deploy-thread");
-
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
                 IgniteServices svcs = ignite.services();
 
                 IgniteServices services = svcs.withAsync();
@@ -67,13 +68,13 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest
{
                 catch (IgniteException e) {
                     finishLatch.countDown();
                 }
-                catch (Throwable e) {
-                    log.error("Service deployment error: ", e);
+                finally {
+                    finishLatch.countDown();
                 }
-            }
-        });
 
-        t.start();
+                return null;
+            }
+        }, "deploy-thread");
 
         depLatch.await();
 
@@ -85,6 +86,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest
{
             U.dumpThreads(log);
 
         assertTrue("Deploy future isn't completed", wait);
+
+        fut.get();
     }
 
     /**


Mime
View raw message