ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1027 Fixed early rebalance sync future completion.
Date Wed, 02 Dec 2015 09:40:40 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1027 [created] 13276ac2e


ignite-1027 Fixed early rebalance sync future completion.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13276ac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13276ac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13276ac2

Branch: refs/heads/ignite-1027
Commit: 13276ac2e77aff2fb8370da54255993a16797421
Parents: 3ac1504
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 2 12:39:05 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 2 12:40:28 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 51 +++++-----
 .../processors/cache/GridCachePreloader.java    | 11 ++-
 .../dht/preloader/GridDhtPartitionDemander.java | 27 ++++--
 .../dht/preloader/GridDhtPreloader.java         |  2 +-
 ...cingDelayedPartitionMapExchangeSelfTest.java |  9 +-
 .../GridCacheRebalancingAsyncSelfTest.java      |  3 +-
 .../GridCacheRebalancingSyncCheckDataTest.java  | 98 ++++++++++++++++++++
 .../GridCacheRebalancingSyncSelfTest.java       | 55 +++++------
 ...eRebalancingUnmarshallingFailedSelfTest.java |  6 +-
 .../testsuites/IgniteCacheTestSuite3.java       |  2 +
 10 files changed, 191 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b13a5af..17abace 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1349,13 +1349,13 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                                 long delay = cacheCtx.config().getRebalanceDelay();
 
-                                GridDhtPreloaderAssignments assigns = null;
-
                                 // Don't delay for dummy reassigns to avoid infinite recursion.
-                                if (delay == 0 || forcePreload)
-                                    assigns = cacheCtx.preloader().assign(exchFut);
+                                if (delay == 0 || forcePreload) {
+                                    GridDhtPreloaderAssignments assigns = cacheCtx.preloader().assign(exchFut);
 
-                                assignsMap.put(cacheCtx.cacheId(), assigns);
+                                    if (assigns != null)
+                                        assignsMap.put(cacheCtx.cacheId(), assigns);
+                                }
                             }
                         }
                     }
@@ -1399,24 +1399,27 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                         waitList.add(cctx.cacheContext(cId).name());
                                 }
 
-                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(
-                                    assignsMap.get(cacheId), forcePreload, waitList, cnt);
+                                GridDhtPreloaderAssignments assignments = assignsMap.get(cacheId);
+
+                                if (assignments != null) {
+                                    Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignments,
+                                        forcePreload,
+                                        waitList,
+                                        cnt);
 
-                                if (r != null) {
-                                    U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name()
+
-                                        ", waitList=" + waitList.toString() + "]");
+                                    if (r != null) {
+                                        U.log(log, "Cache rebalancing scheduled: [cache="
+ cacheCtx.name() +
+                                            ", waitList=" + waitList.toString() + "]");
 
-                                    if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
-                                        marshR = r;
-                                    else
-                                        orderedRs.add(r);
+                                        if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
+                                            marshR = r;
+                                        else
+                                            orderedRs.add(r);
+                                    }
                                 }
                             }
                         }
 
-                        if (asyncStartFut != null)
-                            asyncStartFut.get(); // Wait for thread stop.
-
                         rebalanceQ.addAll(orderedRs);
 
                         if (marshR != null || !rebalanceQ.isEmpty()) {
@@ -1425,7 +1428,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
 
-                                if (marshR != null)
+                                if (marshR != null) {
                                     try {
                                         marshR.call(); //Marshaller cache rebalancing launches
in sync way.
                                     }
@@ -1435,6 +1438,10 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
 
                                         continue;
                                     }
+                                }
+
+                                if (asyncStartFut != null)
+                                    asyncStartFut.get(); // Wait for thread stop.
 
                                 final GridFutureAdapter fut = new GridFutureAdapter();
 
@@ -1463,17 +1470,19 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                             fut.onDone();
                                         }
                                     }
-                                }, /*system pool*/ true);
+                                }, /*system pool*/true);
                             }
-                            else
+                            else {
                                 U.log(log, "Skipping rebalancing (obsolete exchange ID) "
+
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');
+                            }
                         }
-                        else
+                        else {
                             U.log(log, "Skipping rebalancing (nothing scheduled) " +
                                 "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                 ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
                     }
                 }
                 catch (IgniteInterruptedCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 8e1164b..c8fcb90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -86,9 +86,9 @@ public interface GridCachePreloader {
 
     /**
      * @param exchFut Exchange future to assign.
-     * @return Assignments.
+     * @return Assignments or {@code null} if detected that there are pending exchanges.
      */
-    public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
+    @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * Adds assignments to preloader.
@@ -97,9 +97,12 @@ public interface GridCachePreloader {
      * @param forcePreload Force preload flag.
      * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
+     * @return Rebalancing closure.
      */
-    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
-        Collection<String> caches, int cnt);
+    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+        boolean forcePreload,
+        Collection<String> caches,
+        int cnt);
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index eb9e97f..6b923d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -251,6 +251,7 @@ public class GridDhtPartitionDemander {
     /**
      * @param name Cache name.
      * @param fut Future.
+     * @throws IgniteCheckedException If failed.
      */
     private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException
{
         if (log.isDebugEnabled())
@@ -283,7 +284,7 @@ public class GridDhtPartitionDemander {
      * @param force {@code True} if dummy reassign.
      * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
-     * @throws IgniteCheckedException If failed.
+     * @return Rebalancing closure.
      */
     Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean
force,
         final Collection<String> caches, int cnt) {
@@ -301,12 +302,13 @@ public class GridDhtPartitionDemander {
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
-            else
+            else {
                 fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> future)
{
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut)
{
                         oldFut.onDone(fut.result());
                     }
                 });
+            }
 
             rebalanceFut = fut;
 
@@ -357,6 +359,9 @@ public class GridDhtPartitionDemander {
 
     /**
      * @param fut Future.
+     * @param assigns Assignments.
+     * @throws IgniteCheckedException If failed.
+     * @return
      */
     private boolean requestPartitions(
         RebalanceFuture fut,
@@ -370,7 +375,7 @@ public class GridDhtPartitionDemander {
 
             GridDhtPartitionDemandMessage d = e.getValue();
 
-            fut.appendPartitions(node.id(), d.partitions());//Future preparation.
+            fut.appendPartitions(node.id(), d.partitions()); //Future preparation.
         }
 
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
@@ -413,7 +418,8 @@ public class GridDhtPartitionDemander {
                         initD.timeout(cctx.config().getRebalanceTimeout());
 
                         synchronized (fut) {
-                            if (!fut.isDone())// Future can be already cancelled at this
moment and all failovers happened.
+                            if (!fut.isDone())
+                                // Future can be already cancelled at this moment and all
failovers happened.
                                 // New requests will not be covered by failovers.
                                 cctx.io().sendOrderedMessage(node,
                                     rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
@@ -427,9 +433,12 @@ public class GridDhtPartitionDemander {
                 }
             }
             else {
-                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode="
+ cfg.getRebalanceMode() +
-                    ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
-                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq
+ "]");
+                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
+                    ", mode=" + cfg.getRebalanceMode() +
+                    ", fromNode=" + node.id() +
+                    ", partitionsCount=" + parts.size() +
+                    ", topology=" + fut.topologyVersion() +
+                    ", updateSeq=" + fut.updateSeq + "]");
 
                 d.timeout(cctx.config().getRebalanceTimeout());
                 d.workerId(0);//old api support.
@@ -860,7 +869,7 @@ public class GridDhtPartitionDemander {
                     return true;
 
                 U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name()
-                    + ", topology=" + topologyVersion());
+                    + ", topology=" + topologyVersion() + ']');
 
                 if (!cctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 3e3cee3..14734d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -324,7 +324,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     log.debug("Skipping assignments creation, exchange worker has pending
assignments: " +
                         exchFut.exchangeId());
 
-                break;
+                return null;
             }
 
             // If partition belongs to local node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index a1ea7ad..2890fcb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -73,20 +73,20 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends
Gri
     public class DelayableCommunicationSpi extends TcpCommunicationSpi {
         /** {@inheritDoc} */
         @Override public void sendMessage(final ClusterNode node, final Message msg,
-            final IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException
{
+            final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException
{
             final Object msg0 = ((GridIoMessage)msg).message();
 
             if (msg0 instanceof GridDhtPartitionsFullMessage && record &&
                 ((GridDhtPartitionsFullMessage)msg0).exchangeId() == null) {
                 rs.putIfAbsent(node.id(), new Runnable() {
                     @Override public void run() {
-                        DelayableCommunicationSpi.super.sendMessage(node, msg, ackClosure);
+                        DelayableCommunicationSpi.super.sendMessage(node, msg, ackC);
                     }
                 });
             }
             else
                 try {
-                    super.sendMessage(node, msg, ackClosure);
+                    super.sendMessage(node, msg, ackC);
                 }
                 catch (Exception e) {
                     U.log(null, e);
@@ -144,9 +144,8 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends
Gri
 
         awaitPartitionMapExchange();
 
-        for (Runnable r : rs.values()) {
+        for (Runnable r : rs.values())
             r.run();
-        }
 
         U.sleep(10000); // Enough time to process delayed GridDhtPartitionsFullMessages.
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
index 7759c70..bcda0da 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -33,9 +33,8 @@ public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncS
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration iCfg = super.getConfiguration(gridName);
 
-        for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) {
+        for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration())
             cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
-        }
 
         return iCfg;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
new file mode 100644
index 0000000..5e4a5c4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncCheckDataTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncCheckDataTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setRebalanceMode(SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDataRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        final int KEYS = 10_000;
+
+        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+        for (int i = 0; i < KEYS; i++)
+            cache.put(i, i);
+
+
+        for (int i = 0; i < 3; i++) {
+            log.info("Iteration: " + i);
+
+            final AtomicInteger idx = new AtomicInteger(1);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try(Ignite ignite = startGrid(idx.getAndIncrement())) {
+                        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+                        for (int i = 0; i < KEYS; i++)
+                            assertNotNull(cache.localPeek(i));
+                    }
+
+                    return null;
+                }
+            }, 5, "start-node");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 8c5cd40..3b25bd7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -45,19 +45,19 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static int TEST_SIZE = 100_000;
+    private static final int TEST_SIZE = 100_000;
 
     /** partitioned cache name. */
-    protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP";
+    protected static final String CACHE_NAME_DHT_PARTITIONED = "cacheP";
 
     /** partitioned cache 2 name. */
-    protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
+    protected static final String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
 
     /** replicated cache name. */
-    protected static String CACHE_NAME_DHT_REPLICATED = "cacheR";
+    protected static final String CACHE_NAME_DHT_REPLICATED = "cacheR";
 
     /** replicated cache 2 name. */
-    protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
+    protected static final String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
 
     /** */
     private volatile boolean concurrentStartFinished;
@@ -122,6 +122,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
+     * @param from Start from key.
+     * @param iter Iteration.
      */
     protected void generateData(Ignite ignite, int from, int iter) {
         generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
@@ -132,6 +134,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
+     * @param name Cache name.
+     * @param from Start from key.
+     * @param iter Iteration.
      */
     protected void generateData(Ignite ignite, String name, int from, int iter) {
         for (int i = from; i < from + TEST_SIZE; i++) {
@@ -144,9 +149,10 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
-     * @throws IgniteCheckedException Exception.
+     * @param from Start from key.
+     * @param iter Iteration.
      */
-    protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException
{
+    protected void checkData(Ignite ignite, int from, int iter) {
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
         checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
         checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
@@ -155,10 +161,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
     /**
      * @param ignite Ignite.
+     * @param from Start from key.
+     * @param iter Iteration.
      * @param name Cache name.
-     * @throws IgniteCheckedException Exception.
      */
-    protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException
{
+    protected void checkData(Ignite ignite, String name, int from, int iter) {
         for (int i = from; i < from + TEST_SIZE; i++) {
             if (i % (TEST_SIZE / 10) == 0)
                 log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries
(" + TEST_SIZE + ").");
@@ -169,7 +176,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @throws Exception Exception
+     * @throws Exception If failed.
      */
     public void testSimpleRebalancing() throws Exception {
         Ignite ignite = startGrid(0);
@@ -206,7 +213,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * @throws Exception Exception
+     * @throws Exception If failed.
      */
     public void testLoadRebalancing() throws Exception {
         final Ignite ignite = startGrid(0);
@@ -240,14 +247,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
         Thread t2 = new Thread() {
             @Override public void run() {
-                while (!concurrentStartFinished) {
-                    try {
-                        checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
-                    }
-                    catch (IgniteCheckedException e) {
-                        e.printStackTrace();
-                    }
-                }
+                while (!concurrentStartFinished)
+                    checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
             }
         };
 
@@ -282,7 +283,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
      * @param id Node id.
      * @param major Major ver.
      * @param minor Minor ver.
-     * @throws IgniteCheckedException Exception.
+     * @throws IgniteCheckedException If failed.
      */
     protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException
{
         waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
@@ -291,7 +292,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     /**
      * @param id Node id.
      * @param major Major ver.
-     * @throws IgniteCheckedException Exception.
+     * @throws IgniteCheckedException If failed.
      */
     protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
         waitForRebalancing(id, new AffinityTopologyVersion(major));
@@ -300,7 +301,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     /**
      * @param id Node id.
      * @param top Topology version.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException
{
         boolean finished = false;
@@ -327,6 +328,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
     /**
      *
      */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     protected void checkSupplyContextMapIsEmpty() {
         for (Ignite g : G.allGrids()) {
             for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
@@ -342,12 +344,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         }
     }
 
+    /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 5 * 60_000;
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testComplexRebalancing() throws Exception {
         final Ignite ignite = startGrid(0);
@@ -368,9 +371,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
                     startGrid(1);
                     startGrid(2);
 
-                    while (!concurrentStartFinished2) {
+                    while (!concurrentStartFinished2)
                         U.sleep(10);
-                    }
 
                     waitForRebalancing(0, 5, 0);
                     waitForRebalancing(1, 5, 0);
@@ -387,9 +389,8 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
 
                     grid(0).getOrCreateCache(cacheRCfg);
 
-                    while (!concurrentStartFinished3) {
+                    while (!concurrentStartFinished3)
                         U.sleep(10);
-                    }
 
                     concurrentStartFinished = true;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
index 506f1c2..7e35906 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -125,9 +125,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
 
         startGrid(0);
 
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 100; i++)
             grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i);
-        }
 
         readCnt.set(1);
 
@@ -135,9 +134,8 @@ public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonA
 
         readCnt.set(Integer.MAX_VALUE);
 
-        for (int i = 0; i < 50; i++) {
+        for (int i = 0; i < 50; i++)
             assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null;
-        }
 
         stopGrid(0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/13276ac2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index b02d022..176ab3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRabalancingDelayedPartitionMapExchangeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncCheckDataTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
@@ -140,6 +141,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
 
         suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class);
         suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingSyncCheckDataTest.class);
         suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class);
         suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
         suite.addTestSuite(GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.class);


Mime
View raw message