ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1027 Fixed early rebalance sync future completion.
Date Thu, 03 Dec 2015 07:45:37 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1.5 9b60c75c6 -> ad9e4db5b


ignite-1027 Fixed early rebalance sync future completion.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad9e4db5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad9e4db5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad9e4db5

Branch: refs/heads/ignite-1.5
Commit: ad9e4db5b87b064d13db4f9251c25efd535fb9e8
Parents: 9b60c75
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Dec 3 10:45:30 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Dec 3 10:45:30 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 17 ++--
 .../processors/cache/GridCachePreloader.java    | 11 ++-
 .../dht/preloader/GridDhtPartitionDemander.java | 41 ++++----
 .../dht/preloader/GridDhtPreloader.java         |  4 +-
 .../preloader/GridDhtPreloaderAssignments.java  | 19 +++-
 .../dht/GridCacheDhtPreloadDelayedSelfTest.java | 37 +++++---
 ...cingDelayedPartitionMapExchangeSelfTest.java |  9 +-
 .../GridCacheRebalancingAsyncSelfTest.java      |  3 +-
 .../GridCacheRebalancingSyncCheckDataTest.java  | 98 ++++++++++++++++++++
 .../GridCacheRebalancingSyncSelfTest.java       | 55 +++++------
 ...eRebalancingUnmarshallingFailedSelfTest.java |  6 +-
 .../testsuites/IgniteCacheTestSuite3.java       |  2 +
 12 files changed, 223 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b13a5af..a0f7f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1399,8 +1399,10 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                         waitList.add(cctx.cacheContext(cId).name());
                                 }
 
-                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(
-                                    assignsMap.get(cacheId), forcePreload, waitList, cnt);
+                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+                                    forcePreload,
+                                    waitList,
+                                    cnt);
 
                                 if (r != null) {
                                     U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name()
+
@@ -1425,7 +1427,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
 
-                                if (marshR != null)
+                                if (marshR != null) {
                                     try {
                                         marshR.call(); //Marshaller cache rebalancing launches
in sync way.
                                     }
@@ -1435,6 +1437,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                                         continue;
                                     }
+                                }
 
                                 final GridFutureAdapter fut = new GridFutureAdapter();
 
@@ -1463,17 +1466,19 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                             fut.onDone();
                                         }
                                     }
-                                }, /*system pool*/ true);
+                                }, /*system pool*/true);
                             }
-                            else
+                            else {
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) "
+
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
+                            }
                         }
-                        else
+                        else {
                             U.log(log, "Skipping rebalancing (nothing scheduled) " +
                                 "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                 ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 8e1164b..c8fcb90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -86,9 +86,9 @@ public interface GridCachePreloader {
 
     /**
      * @param exchFut Exchange future to assign.
-     * @return Assignments.
+     * @return Assignments or {@code null} if detected that there are pending exchanges.
      */
-    public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
+    @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * Adds assignments to preloader.
@@ -97,9 +97,12 @@ public interface GridCachePreloader {
      * @param forcePreload Force preload flag.
      * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
+     * @return Rebalancing closure.
      */
-    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
-        Collection<String> caches, int cnt);
+    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+        boolean forcePreload,
+        Collection<String> caches,
+        int cnt);
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index eb9e97f..ced0d10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -251,6 +251,7 @@ public class GridDhtPartitionDemander {
     /**
      * @param name Cache name.
      * @param fut Future.
+     * @throws IgniteCheckedException If failed.
      */
     private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException
{
         if (log.isDebugEnabled())
@@ -283,7 +284,7 @@ public class GridDhtPartitionDemander {
      * @param force {@code True} if dummy reassign.
      * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
-     * @throws IgniteCheckedException If failed.
+     * @return Rebalancing closure.
      */
     Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean
force,
         final Collection<String> caches, int cnt) {
@@ -293,25 +294,24 @@ public class GridDhtPartitionDemander {
         long delay = cctx.config().getRebalanceDelay();
 
         if (delay == 0 || force) {
-            assert assigns != null;
-
             final RebalanceFuture oldFut = rebalanceFut;
 
             final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(),
cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
-            else
+            else {
                 fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> future)
{
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut)
{
                         oldFut.onDone(fut.result());
                     }
                 });
+            }
 
             rebalanceFut = fut;
 
             if (assigns.isEmpty()) {
-                fut.doneIfEmpty();
+                fut.doneIfEmpty(assigns.cancelled());
 
                 return null;
             }
@@ -357,6 +357,9 @@ public class GridDhtPartitionDemander {
 
     /**
      * @param fut Future.
+     * @param assigns Assignments.
+     * @throws IgniteCheckedException If failed.
+     * @return
      */
     private boolean requestPartitions(
         RebalanceFuture fut,
@@ -370,7 +373,7 @@ public class GridDhtPartitionDemander {
 
             GridDhtPartitionDemandMessage d = e.getValue();
 
-            fut.appendPartitions(node.id(), d.partitions());//Future preparation.
+            fut.appendPartitions(node.id(), d.partitions()); //Future preparation.
         }
 
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
@@ -413,7 +416,8 @@ public class GridDhtPartitionDemander {
                         initD.timeout(cctx.config().getRebalanceTimeout());
 
                         synchronized (fut) {
-                            if (!fut.isDone())// Future can be already cancelled at this
moment and all failovers happened.
+                            if (!fut.isDone())
+                                // Future can be already cancelled at this moment and all
failovers happened.
                                 // New requests will not be covered by failovers.
                                 cctx.io().sendOrderedMessage(node,
                                     rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
@@ -427,9 +431,12 @@ public class GridDhtPartitionDemander {
                 }
             }
             else {
-                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode="
+ cfg.getRebalanceMode() +
-                    ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
-                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq
+ "]");
+                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
+                    ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() +
+                    ", partitionsCount=" + parts.size() +
+                    ", topology=" + fut.topologyVersion() +
+                    ", updateSeq=" + fut.updateSeq + "]");
 
                 d.timeout(cctx.config().getRebalanceTimeout());
                 d.workerId(0);//old api support.
@@ -832,9 +839,9 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         *
+         * @param cancelled Is cancelled.
          */
-        private void doneIfEmpty() {
+        private void doneIfEmpty(boolean cancelled) {
             synchronized (this) {
                 if (isDone())
                     return;
@@ -845,14 +852,14 @@ public class GridDhtPartitionDemander {
                     log.debug("Rebalancing is not required [cache=" + cctx.name() +
                         ", topology=" + topVer + "]");
 
-                checkIsDone();
+                checkIsDone(cancelled);
             }
         }
 
         /**
          * Cancels this future.
          *
-         * @return {@code true}.
+         * @return {@code True}.
          */
         @Override public boolean cancel() {
             synchronized (this) {
@@ -860,7 +867,7 @@ public class GridDhtPartitionDemander {
                     return true;
 
                 U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
-                    + ", topology=" + topologyVersion());
+                    + ", topology=" + topologyVersion() + ']');
 
                 if (!cctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())
@@ -1012,7 +1019,7 @@ public class GridDhtPartitionDemander {
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
 
                 if (log.isDebugEnabled())
-                    log.debug("Completed rebalance future.");
+                    log.debug("Completed rebalance future: " + this);
 
                 cctx.shared().exchange().scheduleResendPartitions();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 3e3cee3..9a6246f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -324,7 +324,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     log.debug("Skipping assignments creation, exchange worker has pending
assignments: " +
                         exchFut.exchangeId());
 
-                break;
+                assigns.cancelled(true);
+
+                return assigns;
             }
 
             // If partition belongs to local node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 3583967..3f82c9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -37,19 +37,36 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
     /** Last join order. */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    private boolean cancelled;
+
     /**
      * @param exchFut Exchange future.
      * @param topVer Last join order.
      */
     public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion
topVer) {
         assert exchFut != null;
-        assert topVer.topologyVersion() > 0;
+        assert topVer.topologyVersion() > 0 : topVer;
 
         this.exchFut = exchFut;
         this.topVer = topVer;
     }
 
     /**
+     * @return {@code True} if assignments creation was cancelled.
+     */
+    public boolean cancelled() {
+        return cancelled;
+    }
+
+    /**
+     * @param cancelled {@code True} if assignments creation was cancelled.
+     */
+    public void cancelled(boolean cancelled) {
+        this.cancelled = cancelled;
+    }
+
+    /**
      * @return Exchange future.
      */
     GridDhtPartitionsExchangeFuture exchangeFuture() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
index 9d6e82f..0b610f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadDelayedSelfTest.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CachePeekMode;
@@ -35,7 +34,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
@@ -51,6 +49,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -107,7 +106,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
         stopAllGrids();
     }
 
-    /** @throws Exception If failed. */
+    /**
+     * @throws Exception If failed.
+     */
     public void testManualPreload() throws Exception {
         delay = -1;
 
@@ -184,7 +185,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
         checkCache(c2, cnt);
     }
 
-    /** @throws Exception If failed. */
+    /**
+     * @throws Exception If failed.
+     */
     public void testDelayedPreload() throws Exception {
         delay = PRELOAD_DELAY;
 
@@ -238,9 +241,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
 
         checkMaps(false, d0, d1, d2);
 
-        assert l1.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS);
+        assert l1.await(PRELOAD_DELAY * 3 / 2, MILLISECONDS);
 
-        assert l2.await(PRELOAD_DELAY * 3 / 2, TimeUnit.MILLISECONDS);
+        assert l2.await(PRELOAD_DELAY * 3 / 2, MILLISECONDS);
 
         U.sleep(1000);
 
@@ -253,7 +256,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
         checkCache(c2, cnt);
     }
 
-    /** @throws Exception If failed. */
+    /**
+     * @throws Exception If failed.
+     */
     public void testAutomaticPreload() throws Exception {
         delay = 0;
         preloadMode = CacheRebalanceMode.SYNC;
@@ -284,7 +289,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
         checkCache(c2, cnt);
     }
 
-    /** @throws Exception If failed. */
+    /**
+     * @throws Exception If failed.
+     */
     public void testAutomaticPreloadWithEmptyCache() throws Exception {
         preloadMode = SYNC;
 
@@ -331,7 +338,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
         }
     }
 
-    /** @throws Exception If failed. */
+    /**
+     * @throws Exception If failed.
+     */
     public void testManualPreloadSyncMode() throws Exception {
         preloadMode = CacheRebalanceMode.SYNC;
         delay = -1;
@@ -344,7 +353,9 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
         }
     }
 
-    /** @throws Exception If failed. */
+    /**
+     * @throws Exception If failed.
+     */
     public void testPreloadManyNodes() throws Exception {
         delay = 0;
         preloadMode = ASYNC;
@@ -419,9 +430,11 @@ public class GridCacheDhtPreloadDelayedSelfTest extends GridCommonAbstractTest
{
      *
      * @param strict Strict check flag.
      * @param caches Maps to compare.
+     * @throws Exception If failed.
      */
-    private void checkMaps(final boolean strict, final GridDhtCacheAdapter<String, Integer>...
caches)
-        throws IgniteInterruptedCheckedException {
+    @SafeVarargs
+    private final void checkMaps(final boolean strict, final GridDhtCacheAdapter<String,
Integer>... caches)
+        throws Exception {
         if (caches.length < 2)
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index a1ea7ad..2890fcb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -73,20 +73,20 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends
Gri
     public class DelayableCommunicationSpi extends TcpCommunicationSpi {
         /** {@inheritDoc} */
         @Override public void sendMessage(final ClusterNode node, final Message msg,
-            final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException
{
+            final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
{
             final Object msg0 = ((GridIoMessage)msg).message();
 
             if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
                 ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
                 rs.putIfAbsent(node.id(), new Runnable() {
                     @Override public void run() {
-                        DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure);
+                        DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
                     }
                 });
             }
             else
                 try {
-                    super.sendMessage(node, msg, ackClosure);
+                    super.sendMessage(node, msg, ackC);
                 }
                 catch (Exception e) {
                     U.log(null, e);
@@ -144,9 +144,8 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends
Gri
 
         awaitPartitionMapExchange();
 
-        for (Runnable r : rs.values()) {
+        for (Runnable r : rs.values())
             r.run();
-        }
 
         U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
index 7759c70..bcda0da 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -33,9 +33,8 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration iCfg = super.getConfiguration(gridName);
 
-        for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) {
+        for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration())
             cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
-        }
 
         return iCfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
new file mode 100644
index 0000000..5e4a5c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncCheckDataTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setRebalanceMode(SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDataRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        final int KEYS = 10_000;
+
+        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+        for (int i = 0; i < KEYS; i++)
+            cache.put(i, i);
+
+
+        for (int i = 0; i < 3; i++) {
+            log.info("Iteration: " + i);
+
+            final AtomicInteger idx = new AtomicInteger(1);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try(Ignite ignite = startGrid(idx.getAndIncrement())) {
+                        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+                        for (int i = 0; i < KEYS; i++)
+                            assertNotNull(cache.localPeek(i));
+                    }
+
+                    return null;
+                }
+            }, 5, "start-node");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 8c5cd40..3b25bd7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -45,19 +45,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static int TEST_SIZE = 100_000;
+    private static final int TEST_SIZE = 100_000;
 
     /** partitioned cache name. */
-    protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP";
+    protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP";
 
     /** partitioned cache 2 name. */
-    protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
+    protected static final String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
 
     /** replicated cache name. */
-    protected static String CACHE_NAME_DHT_REPLICATED = "cacheR";
+    protected static final String CACHE_NAME_DHT_REPLICATED = "cacheR";
 
     /** replicated cache 2 name. */
-    protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
+    protected static final String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
 
     /** */
     private volatile boolean concurrentStartFinished;
@@ -122,6 +122,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
+     * @param from Start from key.
+     * @param iter Iteration.
      */
     protected void generateData(Ignite ignite, int from, int iter) {
         generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
@@ -132,6 +134,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
+     * @param name Cache name.
+     * @param from Start from key.
+     * @param iter Iteration.
      */
     protected void generateData(Ignite ignite, String name, int from, int iter) {
         for (int i = from; i < from + TEST_SIZE; i++) {
@@ -144,9 +149,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
-     * @throws IgniteCheckedException Exception.
+     * @param from Start from key.
+     * @param iter Iteration.
      */
-    protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException
{
+    protected void checkData(Ignite ignite, int from, int iter) {
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
         checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
@@ -155,10 +161,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
+     * @param from Start from key.
+     * @param iter Iteration.
      * @param name Cache name.
-     * @throws IgniteCheckedException Exception.
      */
-    protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException
{
+    protected void checkData(Ignite ignite, String name, int from, int iter) {
         for (int i = from; i < from + TEST_SIZE; i++) {
             if (i % (TEST_SIZE / 10) == 0)
                 log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries
(" + TEST_SIZE + ").");
@@ -169,7 +176,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @throws Exception Exception
+     * @throws Exception If failed.
      */
     public void testSimpleRebalancing() throws Exception {
         Ignite ignite = startGrid(0);
@@ -206,7 +213,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @throws Exception Exception
+     * @throws Exception If failed.
      */
     public void testLoadRebalancing() throws Exception {
         final Ignite ignite = startGrid(0);
@@ -240,14 +247,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         Thread t2 = new Thread() {
             @Override public void run() {
-                while (!concurrentStartFinished) {
-                    try {
-                        checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
-                    }
-                    catch (IgniteCheckedException e) {
-                        e.printStackTrace();
-                    }
-                }
+                while (!concurrentStartFinished)
+                    checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
             }
         };
 
@@ -282,7 +283,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
      * @param id Node id.
      * @param major Major ver.
      * @param minor Minor ver.
-     * @throws IgniteCheckedException Exception.
+     * @throws IgniteCheckedException If failed.
      */
     protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException
{
         waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
@@ -291,7 +292,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     /**
      * @param id Node id.
      * @param major Major ver.
-     * @throws IgniteCheckedException Exception.
+     * @throws IgniteCheckedException If failed.
      */
     protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
         waitForRebalancing(id, new AffinityTopologyVersion(major));
@@ -300,7 +301,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     /**
      * @param id Node id.
      * @param top Topology version.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException
{
         boolean finished = false;
@@ -327,6 +328,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     /**
      *
      */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     protected void checkSupplyContextMapIsEmpty() {
         for (Ignite g : G.allGrids()) {
             for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
@@ -342,12 +344,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         }
     }
 
+    /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 5 * 60_000;
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testComplexRebalancing() throws Exception {
         final Ignite ignite = startGrid(0);
@@ -368,9 +371,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
                     startGrid(1);
                     startGrid(2);
 
-                    while (!concurrentStartFinished2) {
+                    while (!concurrentStartFinished2)
                         U.sleep(10);
-                    }
 
                     waitForRebalancing(0, 5, 0);
                     waitForRebalancing(1, 5, 0);
@@ -387,9 +389,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
                     grid(0).getOrCreateCache(cacheRCfg);
 
-                    while (!concurrentStartFinished3) {
+                    while (!concurrentStartFinished3)
                         U.sleep(10);
-                    }
 
                     concurrentStartFinished = true;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
index 506f1c2..7e35906 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -125,9 +125,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
 
         startGrid(0);
 
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 100; i++)
             grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i);
-        }
 
         readCnt.set(1);
 
@@ -135,9 +134,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
 
         readCnt.set(Integer.MAX_VALUE);
 
-        for (int i = 0; i < 50; i++) {
+        for (int i = 0; i < 50; i++)
             assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null;
-        }
 
         stopGrid(0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad9e4db5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index b02d022..176ab3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
@@ -140,6 +141,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
 
         suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class);
         suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingSyncCheckDataTest.class);
         suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class);
         suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
         suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class);


Mime
View raw message