ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject ignite git commit: IGNITE-2801 Coordinator floods network with partitions full map exchange messages
Date Tue, 29 Mar 2016 11:59:28 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 7f9ee2d59 -> 6a3d72480


IGNITE-2801 Coordinator floods network with partitions full map exchange messages


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a3d7248
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a3d7248
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a3d7248

Branch: refs/heads/master
Commit: 6a3d724805e231edca2d8d72891f15a8a729bbc2
Parents: 7f9ee2d
Author: Anton Vinogradov <av@apache.org>
Authored: Tue Mar 29 14:56:21 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Mar 29 14:58:14 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   8 +-
 ...cingDelayedPartitionMapExchangeSelfTest.java |  14 +-
 .../GridCacheRebalancingSyncSelfTest.java       | 186 ++++++++++++++++++-
 .../junits/common/GridCommonAbstractTest.java   |  53 +++++-
 4 files changed, 237 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 54580fd..6de10c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1264,13 +1264,9 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                             break;
                     }
 
-                    // If not first preloading and no more topology events present,
-                    // then we periodically refresh partition map.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() &&
preloadFinished) {
-                        refreshPartitions(timeout);
-
+                    // If not first preloading and no more topology events present.
+                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() &&
preloadFinished)
                         timeout = cctx.gridConfig().getNetworkTimeout();
-                    }
 
                     // After workers line up and before preloading starts we initialize all
futures.
                     if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index 2890fcb..2c47a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,7 +100,7 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends
Gri
      * @throws Exception e.
      */
     public void test() throws Exception {
-        startGrid(0);
+        IgniteKernal ignite = (IgniteKernal)startGrid(0);
 
         CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
 
@@ -114,26 +115,29 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest
extends Gri
         startGrid(2);
         startGrid(3);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         for (int i = 0; i < 2; i++) {
             stopGrid(3);
 
-            awaitPartitionMapExchange(true);
+            awaitPartitionMapExchange(true, true);
 
             startGrid(3);
 
-            awaitPartitionMapExchange(true);
+            awaitPartitionMapExchange(true, true);
         }
 
         startGrid(4);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         assert rs.isEmpty();
 
         record = true;
 
+        // Emulate latest GridDhtPartitionsFullMessages.
+        ignite.context().cache().context().exchange().scheduleResendPartitions();
+
         while (rs.size() < 3) { // N - 1 nodes.
             U.sleep(10);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index e4ad66b..f1e5687 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -17,20 +17,40 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -38,6 +58,9 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+
 /**
  *
  */
@@ -69,6 +92,12 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     /** */
     private volatile boolean concurrentStartFinished3;
 
+    /** */
+    private volatile boolean record = false;
+
+    /** */
+    private final ConcurrentHashMap<Class, AtomicInteger> map = new ConcurrentHashMap<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration iCfg = super.getConfiguration(gridName);
@@ -76,6 +105,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
+        TcpCommunicationSpi commSpi = new CountingCommunicationSpi();
+
+        commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        commSpi.setTcpNoDelay(true);
+
+        iCfg.setCommunicationSpi(commSpi);
+
         if (getTestGridName(10).equals(gridName))
             iCfg.setClientMode(true);
 
@@ -173,8 +209,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
                 log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) +
"% entries. [count=" + TEST_SIZE +
                     ", iteration=" + iter + ", cache=" + name + "]");
 
-            assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i
+ name.hashCode() + iter) :
-                i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i)
+ ")";
+            assertTrue(i + " value " + (i + name.hashCode() + iter) + " does not match ("
+ ignite.cache(name).get(i) + ")",
+                ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i
+ name.hashCode() + iter));
+
         }
     }
 
@@ -189,7 +226,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testSimpleRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
+        IgniteKernal ignite = (IgniteKernal)startGrid(0);
 
         generateData(ignite, 0, 0);
 
@@ -202,19 +239,43 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         waitForRebalancing(0, 2);
         waitForRebalancing(1, 2);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         stopGrid(0);
 
         waitForRebalancing(1, 3);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         startGrid(2);
 
         waitForRebalancing(1, 4);
         waitForRebalancing(2, 4);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         stopGrid(2);
 
         waitForRebalancing(1, 5);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         long spend = (System.currentTimeMillis() - start) / 1000;
 
         checkData(grid(1), 0, 0);
@@ -277,7 +338,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         concurrentStartFinished = true;
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -348,12 +409,79 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
                 Map map = U.field(supplier, "scMap");
 
                 synchronized (map) {
-                    assert map.isEmpty();
+                    assertTrue(map.isEmpty());
                 }
             }
         }
     }
 
+    protected void checkPartitionMapExchangeFinished() {
+        for (Ignite g : G.allGrids()) {
+            IgniteKernal g0 = (IgniteKernal)g;
+
+            for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) {
+                CacheConfiguration cfg = c.context().config();
+
+                if (cfg.getCacheMode() != LOCAL && cfg.getRebalanceMode() != NONE)
{
+                    GridDhtCacheAdapter<?, ?> dht = dht(c);
+
+                    GridDhtPartitionTopology top = dht.topology();
+
+                    List<GridDhtLocalPartition> locs = top.localPartitions();
+
+                    for (GridDhtLocalPartition loc : locs) {
+                        assertTrue("Wrong partition state, should be OWNING [state=" + loc.state()
+ "]",
+                            loc.state() == GridDhtPartitionState.OWNING);
+
+                        Collection<ClusterNode> affNodes =
+                            g0.affinity(cfg.getName()).mapPartitionToPrimaryAndBackups(loc.id());
+
+                        assertTrue(affNodes.contains(g0.localNode()));
+                    }
+
+                    for (Ignite remote : G.allGrids()) {
+                        IgniteKernal remote0 = (IgniteKernal)remote;
+
+                        IgniteCacheProxy<?, ?> remoteC = remote0.context().cache().jcache(cfg.getName());
+
+                        GridDhtCacheAdapter<?, ?> remoteDht = dht(remoteC);
+
+                        GridDhtPartitionTopology remoteTop = remoteDht.topology();
+
+                        GridDhtPartitionMap2 pMap = remoteTop.partitionMap(true).get(((IgniteKernal)g).getLocalNodeId());
+
+                        assertEquals(pMap.size(), locs.size());
+
+                        for (Map.Entry entry : pMap.entrySet()) {
+                            assertTrue("Wrong partition state, should be OWNING [state="
+ entry.getValue() + "]",
+                                entry.getValue() == GridDhtPartitionState.OWNING);
+                        }
+
+                        for (GridDhtLocalPartition loc : locs) {
+                            assertTrue(pMap.containsKey(loc.id()));
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    protected void checkPartitionMapMessagesAbsent() throws IgniteInterruptedCheckedException
{
+        map.clear();
+
+        record = true;
+
+        U.sleep(30_000);
+
+        record = false;
+
+        AtomicInteger iF = map.get(GridDhtPartitionsFullMessage.class);
+        AtomicInteger iS = map.get(GridDhtPartitionsSingleMessage.class);
+
+        assertTrue(iF == null || iF.get() == 1); // 1 message can be sent right after all
checks passed.
+        assertTrue(iS == null);
+    }
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 10 * 60_000;
@@ -446,7 +574,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         waitForRebalancing(3, 5, 1);
         waitForRebalancing(4, 5, 1);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -470,7 +598,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         waitForRebalancing(3, 6);
         waitForRebalancing(4, 6);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -480,7 +608,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         waitForRebalancing(3, 7);
         waitForRebalancing(4, 7);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -489,7 +617,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         waitForRebalancing(3, 8);
         waitForRebalancing(4, 8);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
 
         checkSupplyContextMapIsEmpty();
 
@@ -514,4 +646,40 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         stopAllGrids();
     }
+
+    /**
+     *
+     */
+    private class CountingCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(final ClusterNode node, final Message msg,
+            final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
{
+            final Object msg0 = ((GridIoMessage)msg).message();
+
+            recordMessage(msg0);
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /**
+         * @param msg
+         */
+        private void recordMessage(Object msg) {
+            if (record) {
+                Class id = msg.getClass();
+
+                AtomicInteger ai = map.get(id);
+
+                if (ai == null) {
+                    ai = new AtomicInteger();
+
+                    AtomicInteger oldAi = map.putIfAbsent(id, ai);
+
+                    (oldAi != null ? oldAi : ai).incrementAndGet();
+                }
+                else
+                    ai.incrementAndGet();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 4fcc1ed..e53ec56 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -416,15 +418,18 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
      */
     @SuppressWarnings("BusyWait")
     protected void awaitPartitionMapExchange() throws InterruptedException {
-        awaitPartitionMapExchange(false);
+        awaitPartitionMapExchange(false, false);
     }
 
     /**
      * @param waitEvicts If {@code true} will wait for evictions finished.
+     * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update
finished.
      * @throws InterruptedException If interrupted.
      */
     @SuppressWarnings("BusyWait")
-    protected void awaitPartitionMapExchange(boolean waitEvicts) throws InterruptedException
{
+    protected void awaitPartitionMapExchange(boolean waitEvicts, boolean waitNode2PartUpdate)
throws InterruptedException {
+        long timeout = 30_000;
+
         for (Ignite g : G.allGrids()) {
             IgniteKernal g0 = (IgniteKernal)g;
 
@@ -468,7 +473,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
                                 GridDhtLocalPartition loc = top.localPartition(p, readyVer,
false);
 
                                 if (affNodes.size() != owners.size() || !affNodes.containsAll(owners)
||
-                                    (waitEvicts && loc != null && loc.state()
== GridDhtPartitionState.RENTING)) {
+                                    (waitEvicts && loc != null && loc.state()
!= GridDhtPartitionState.OWNING)) {
                                     LT.warn(log(), null, "Waiting for topology map update
[" +
                                         "grid=" + g.name() +
                                         ", cache=" + cfg.getName() +
@@ -501,7 +506,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
                                 if (i == 0)
                                     start = System.currentTimeMillis();
 
-                                if (System.currentTimeMillis() - start > 30_000) {
+                                if (System.currentTimeMillis() - start > timeout) {
                                     U.dumpThreads(log);
 
                                     throw new IgniteException("Timeout of waiting for topology
map update [" +
@@ -526,6 +531,46 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
                             break;
                         }
                     }
+
+                    if (waitNode2PartUpdate) {
+                        long start = System.currentTimeMillis();
+
+                        boolean failed = true;
+
+                        while (failed) {
+                            failed = false;
+
+                            for (GridDhtPartitionMap2 pMap : top.partitionMap(true).values())
{
+                                if (failed)
+                                    break;
+
+                                for (Map.Entry entry : pMap.entrySet()) {
+                                    if (System.currentTimeMillis() - start > timeout)
{
+                                        U.dumpThreads(log);
+
+                                        throw new IgniteException("Timeout of waiting for
partition state update [" +
+                                            "grid=" + g.name() +
+                                            ", cache=" + cfg.getName() +
+                                            ", cacheId=" + dht.context().cacheId() +
+                                            ", topVer=" + top.topologyVersion() +
+                                            ", locNode=" + g.cluster().localNode() + ']');
+                                    }
+
+                                    if (entry.getValue() != GridDhtPartitionState.OWNING)
{
+                                        LT.warn(log(), null,
+                                            "Waiting for correct partition state, should
be OWNING [state=" +
+                                                entry.getValue() + "]");
+
+                                        Thread.sleep(200); // Busy wait.
+
+                                        failed = true;
+
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
                 }
             }
         }


Mime
View raw message