ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [29/50] [abbrv] ignite git commit: Ignite-1093 "Rebalancing with default parameters is very slow" fixes.
Date Tue, 10 Nov 2015 12:53:25 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 998c720..c634ff5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -17,15 +17,18 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -47,27 +50,42 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache preloader.
  */
 public class GridDhtPreloader extends GridCachePreloaderAdapter {
+    /**
+     * Rebalancing was refactored at version 1.5.0, but backward compatibility to previous implementation was saved.
+     * Node automatically chose communication protocol depends on remote node's version.
+     * Backward compatibility may be removed at Ignite 2.x.
+     */
+    public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
+
     /** Default preload resend timeout. */
     public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
 
@@ -81,10 +99,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool supplyPool;
+    private GridDhtPartitionSupplier supplier;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool demandPool;
+    private GridDhtPartitionDemander demander;
 
     /** Start future. */
     private GridFutureAdapter<Object> startFut;
@@ -92,10 +110,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Busy lock to prevent activities from accessing exchanger while it's stopping. */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
+    /** Demand lock. */
+    private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
+
     /** Pending affinity assignment futures. */
     private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8<>();
+
+    /** */
+    private final AtomicInteger partsEvictOwning = new AtomicInteger();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -179,8 +206,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+        supplier = new GridDhtPartitionSupplier(cctx);
+        demander = new GridDhtPartitionDemander(cctx, busyLock);
+
+        supplier.start();
+        demander.start();
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -197,19 +227,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         final long startTopVer = loc.order();
 
         topVer.setIfGreater(startTopVer);
-
-        supplyPool.start();
-        demandPool.start();
     }
 
     /** {@inheritDoc} */
     @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
         super.preloadPredicate(preloadPred);
 
-        assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+        assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
 
-        supplyPool.preloadPredicate(preloadPred);
-        demandPool.preloadPredicate(preloadPred);
+        supplier.preloadPredicate(preloadPred);
+        demander.preloadPredicate(preloadPred);
     }
 
     /** {@inheritDoc} */
@@ -223,37 +250,109 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
-        if (supplyPool != null)
-            supplyPool.stop();
+        if (supplier != null)
+            supplier.stop();
 
-        if (demandPool != null)
-            demandPool.stop();
+        if (demander != null)
+            demander.stop();
 
         top = null;
     }
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
-        if (err == null) {
+        if (err == null)
             startFut.onDone();
+        else
+            startFut.onDone(err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
+        demander.updateLastExchangeFuture(lastFut);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
+        supplier.onTopologyChanged(topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
+        // No assignments for disabled preloader.
+        GridDhtPartitionTopology top = cctx.dht().topology();
+
+        if (!cctx.rebalanceEnabled())
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
+
+        int partCnt = cctx.affinity().partitions();
 
-            final long start = U.currentTimeMillis();
+        assert exchFut.forcePreload() || exchFut.dummyReassign() ||
+            exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
+            "Topology version mismatch [exchId=" + exchFut.exchangeId() +
+                ", topVer=" + top.topologyVersion() + ']';
 
-            final CacheConfiguration cfg = cctx.config();
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
-            if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
-                U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
+        AffinityTopologyVersion topVer = assigns.topologyVersion();
 
-                demandPool.syncFuture().listen(new CI1<Object>() {
-                    @Override public void apply(Object t) {
-                        U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
-                            "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
+        for (int p = 0; p < partCnt; p++) {
+            if (cctx.shared().exchange().hasPendingExchange()) {
+                if (log.isDebugEnabled())
+                    log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
+                        exchFut.exchangeId());
+
+                break;
+            }
+
+            // If partition belongs to local node.
+            if (cctx.affinity().localNode(p, topVer)) {
+                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+
+                assert part != null;
+                assert part.id() == p;
+
+                if (part.state() != MOVING) {
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping partition assignment (state is not MOVING): " + part);
+
+                    continue; // For.
+                }
+
+                Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+                if (picked.isEmpty()) {
+                    top.own(part);
+
+                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                        cctx.events().addPreloadEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+                            discoEvt.type(), discoEvt.timestamp());
                     }
-                });
+
+                    if (log.isDebugEnabled())
+                        log.debug("Owning partition as there are no other owners: " + part);
+                }
+                else {
+                    ClusterNode n = F.rand(picked);
+
+                    GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+                    if (msg == null) {
+                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                            top.updateSequence(),
+                            exchFut.exchangeId().topologyVersion(),
+                            cctx.cacheId()));
+                    }
+
+                    msg.addPartition(p);
+                }
             }
         }
-        else
-            startFut.onDone(err);
+
+        return assigns;
     }
 
     /** {@inheritDoc} */
@@ -267,24 +366,77 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         topVer.set(topVer0);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onExchangeFutureAdded() {
-        demandPool.onExchangeFutureAdded();
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Picked owners.
+     */
+    private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+
+        int affCnt = affNodes.size();
+
+        Collection<ClusterNode> rmts = remoteOwners(p, topVer);
+
+        int rmtCnt = rmts.size();
+
+        if (rmtCnt <= affCnt)
+            return rmts;
+
+        List<ClusterNode> sorted = new ArrayList<>(rmts);
+
+        // Sort in descending order, so nodes with higher order will be first.
+        Collections.sort(sorted, CU.nodeComparator(false));
+
+        // Pick newest nodes.
+        return sorted.subList(0, affCnt);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Nodes owning this partition.
+     */
+    private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+        return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId()));
     }
 
     /** {@inheritDoc} */
-    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        demandPool.updateLastExchangeFuture(lastFut);
+    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2 s) {
+        if (!enterBusy())
+            return;
+
+        try {
+            demandLock.readLock().lock();
+            try {
+                demander.handleSupplyMessage(idx, id, s);
+            }
+            finally {
+                demandLock.readLock().unlock();
+            }
+        }
+        finally {
+            leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        return demandPool.assign(exchFut);
+    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+        if (!enterBusy())
+            return;
+
+        try {
+            supplier.handleDemandMessage(idx, id, d);
+        }
+        finally {
+            leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
-        demandPool.addAssignments(assignments, forcePreload);
+    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+        boolean forcePreload, Collection<String> caches, int cnt) {
+        return demander.addAssignments(assignments, forcePreload, caches, cnt);
     }
 
     /**
@@ -296,7 +448,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
+        return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
     }
 
     /**
@@ -580,12 +737,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void forcePreload() {
-        demandPool.forcePreload();
+        demander.forcePreload();
     }
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        demandPool.unwindUndeploys();
+        demandLock.writeLock().lock();
+
+        try {
+            cctx.deploy().unwind(cctx);
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
     }
 
     /**
@@ -607,6 +771,44 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
+        partsToEvict.add(part);
+
+        if (partsEvictOwning.get() == 0 && partsEvictOwning.compareAndSet(0, 1)) {
+            cctx.closures().callLocalSafe(new GPC<Boolean>() {
+                @Override public Boolean call() {
+                    boolean locked = true;
+
+                    while (locked || !partsToEvict.isEmptyx()) {
+                        if (!locked && !partsEvictOwning.compareAndSet(0, 1))
+                            return false;
+
+                        try {
+                            GridDhtLocalPartition part = partsToEvict.poll();
+
+                            if (part != null)
+                                part.tryEvict();
+                        }
+                        finally {
+                            if (!partsToEvict.isEmptyx())
+                                locked = true;
+                            else {
+                                boolean res = partsEvictOwning.compareAndSet(1, 0);
+
+                                assert res;
+
+                                locked = false;
+                            }
+                        }
+                    }
+
+                    return true;
+                }
+            }, /*system pool*/ true);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void dumpDebugInfo() {
         if (!forceKeyFuts.isEmpty()) {
             U.warn(log, "Pending force key futures [cache=" + cctx.name() +"]:");
@@ -621,6 +823,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
                 U.warn(log, ">>> " + fut);
         }
+
+        supplier.dumpDebugInfo();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 7c5e97c..810bd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -1292,6 +1292,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                         catch (IgniteCheckedException e) {
                                             U.error(log, "Failed to remove count down latch: " + latch0.name(), e);
                                         }
+                                        finally {
+                                            ctx.cache().context().txContextReset();
+                                        }
                                     }
                                 });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 26a41de..9315d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -696,7 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         if (log.isDebugEnabled())
                             U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
 
-                        return;
+                        selfOccupied = true;
+
+                        continue;
                     }
 
                     // Only process 1st response and ignore following ones. This scenario

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
index 835cdcb..c95a859 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridTuple4.java
@@ -239,7 +239,7 @@ public class GridTuple4<V1, V2, V3, V4> implements Iterable<Object>, Externaliza
 
         GridTuple4<?, ?, ?, ?> t = (GridTuple4<?, ?, ?, ?>)o;
 
-        return F.eq(val1, t.val2) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
+        return F.eq(val1, t.val1) && F.eq(val2, t.val2) && F.eq(val3, t.val3) && F.eq(val4, t.val4);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 6254605..854ce95 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * <p>
      * This method is intended for test purposes only.
      */
-    void simulateNodeFailure() {
+    protected void simulateNodeFailure() {
         impl.simulateNodeFailure();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index 1b2b84d..f4423f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -87,7 +87,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes
     }
 
     /** Test key 1. */
-    public static class TestKey implements Externalizable {
+    protected static class TestKey implements Externalizable {
         /** Field. */
         @QuerySqlField(index = true)
         private String field;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index cadd03f..fe0b84e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -46,12 +51,6 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionRollbackException;
 
-import javax.cache.CacheException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -192,13 +191,13 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
         boolean backup,
         final boolean commit
     ) throws Exception {
-        startGrids(gridCount());
-        awaitPartitionMapExchange();
+        try {
+            startGrids(gridCount());
+            awaitPartitionMapExchange();
 
-        for (int i = 0; i < gridCount(); i++)
-            info("Grid " + i + ": " + ignite(i).cluster().localNode().id());
+            for (int i = 0; i < gridCount(); i++)
+                info("Grid " + i + ": " + ignite(i).cluster().localNode().id());
 
-        try {
             final Ignite ignite = ignite(0);
 
             final IgniteCache<Object, Object> cache = ignite.cache(null).withNoRetries();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
new file mode 100644
index 0000000..7759c70
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingAsyncSelfTest.java
@@ -0,0 +1,68 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
+
+/**
+ *
+ */
+public class GridCacheRebalancingAsyncSelfTest extends GridCacheRebalancingSyncSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        for (CacheConfiguration cacheCfg : iCfg.getCacheConfiguration()) {
+            cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
+        }
+
+        return iCfg;
+    }
+
+    /**
+     * @throws Exception Exception.
+     */
+    public void testNodeFailedAtRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite, 0, 0);
+
+        log.info("Preloading started.");
+
+        startGrid(1);
+
+        GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)grid(1).context().
+            cache().internalCache(CACHE_NAME_DHT_REPLICATED).preloader().rebalanceFuture();
+
+        fut.get();
+
+        U.sleep(10);
+
+        ((TestTcpDiscoverySpi)grid(1).configuration().getDiscoverySpi()).simulateNodeFailure();
+
+        waitForRebalancing(0, 3);
+
+        checkSupplyContextMapIsEmpty();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
new file mode 100644
index 0000000..8c5cd40
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -0,0 +1,506 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Map;
+import java.util.Random;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static int TEST_SIZE = 100_000;
+
+    /** partitioned cache name. */
+    protected static String CACHE_NAME_DHT_PARTITIONED = "cacheP";
+
+    /** partitioned cache 2 name. */
+    protected static String CACHE_NAME_DHT_PARTITIONED_2 = "cacheP2";
+
+    /** replicated cache name. */
+    protected static String CACHE_NAME_DHT_REPLICATED = "cacheR";
+
+    /** replicated cache 2 name. */
+    protected static String CACHE_NAME_DHT_REPLICATED_2 = "cacheR2";
+
+    /** */
+    private volatile boolean concurrentStartFinished;
+
+    /** */
+    private volatile boolean concurrentStartFinished2;
+
+    /** */
+    private volatile boolean concurrentStartFinished3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(10).equals(gridName))
+            iCfg.setClientMode(true);
+
+        CacheConfiguration<Integer, Integer> cachePCfg = new CacheConfiguration<>();
+
+        cachePCfg.setName(CACHE_NAME_DHT_PARTITIONED);
+        cachePCfg.setCacheMode(CacheMode.PARTITIONED);
+        cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cachePCfg.setBackups(1);
+        cachePCfg.setRebalanceBatchSize(1);
+        cachePCfg.setRebalanceBatchesPrefetchCount(1);
+        cachePCfg.setRebalanceOrder(2);
+
+        CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
+
+        cachePCfg2.setName(CACHE_NAME_DHT_PARTITIONED_2);
+        cachePCfg2.setCacheMode(CacheMode.PARTITIONED);
+        cachePCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cachePCfg2.setBackups(1);
+        cachePCfg2.setRebalanceOrder(2);
+        //cachePCfg2.setRebalanceDelay(5000);//Known issue, possible deadlock in case of low priority cache rebalancing delayed.
+
+        CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+        cacheRCfg.setName(CACHE_NAME_DHT_REPLICATED);
+        cacheRCfg.setCacheMode(CacheMode.REPLICATED);
+        cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheRCfg.setRebalanceBatchSize(1);
+        cacheRCfg.setRebalanceBatchesPrefetchCount(Integer.MAX_VALUE);
+        ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem fail fix for Integer.MAX_VALUE.
+
+        CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();
+
+        cacheRCfg2.setName(CACHE_NAME_DHT_REPLICATED_2);
+        cacheRCfg2.setCacheMode(CacheMode.REPLICATED);
+        cacheRCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheRCfg2.setRebalanceOrder(4);
+
+        iCfg.setCacheConfiguration(cachePCfg, cachePCfg2, cacheRCfg, cacheRCfg2);
+
+        iCfg.setRebalanceThreadPoolSize(2);
+
+        return iCfg;
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected void generateData(Ignite ignite, int from, int iter) {
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+        generateData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+        generateData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    protected void generateData(Ignite ignite, String name, int from, int iter) {
+        for (int i = from; i < from + TEST_SIZE; i++) {
+            if (i % (TEST_SIZE / 10) == 0)
+                log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+            ignite.cache(name).put(i, i + name.hashCode() + iter);
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void checkData(Ignite ignite, int from, int iter) throws IgniteCheckedException {
+        checkData(ignite, CACHE_NAME_DHT_PARTITIONED, from, iter);
+        checkData(ignite, CACHE_NAME_DHT_PARTITIONED_2, from, iter);
+        checkData(ignite, CACHE_NAME_DHT_REPLICATED, from, iter);
+        checkData(ignite, CACHE_NAME_DHT_REPLICATED_2, from, iter);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param name Cache name.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void checkData(Ignite ignite, String name, int from, int iter) throws IgniteCheckedException {
+        for (int i = from; i < from + TEST_SIZE; i++) {
+            if (i % (TEST_SIZE / 10) == 0)
+                log.info("<" + name + "> Checked " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+            assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) :
+                i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")";
+        }
+    }
+
+    /**
+     * @throws Exception Exception
+     */
+    public void testSimpleRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite, 0, 0);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        startGrid(1);
+
+        waitForRebalancing(0, 2);
+        waitForRebalancing(1, 2);
+
+        stopGrid(0);
+
+        waitForRebalancing(1, 3);
+
+        startGrid(2);
+
+        waitForRebalancing(1, 4);
+        waitForRebalancing(2, 4);
+
+        stopGrid(2);
+
+        waitForRebalancing(1, 5);
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        checkData(grid(1), 0, 0);
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+    }
+
+    /**
+     * @throws Exception Exception
+     */
+    public void testLoadRebalancing() throws Exception {
+        final Ignite ignite = startGrid(0);
+
+        startGrid(1);
+
+        generateData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        concurrentStartFinished = false;
+
+        Thread t1 = new Thread() {
+            @Override public void run() {
+                Random rdm = new Random();
+
+                while (!concurrentStartFinished) {
+                    for (int i = 0; i < TEST_SIZE; i++) {
+                        if (i % (TEST_SIZE / 10) == 0)
+                            log.info("Prepared " + i * 100 / (TEST_SIZE) + "% entries (" + TEST_SIZE + ").");
+
+                        int ii = rdm.nextInt(TEST_SIZE);
+
+                        ignite.cache(CACHE_NAME_DHT_PARTITIONED).put(ii, ii + CACHE_NAME_DHT_PARTITIONED.hashCode());
+                    }
+                }
+            }
+        };
+
+        Thread t2 = new Thread() {
+            @Override public void run() {
+                while (!concurrentStartFinished) {
+                    try {
+                        checkData(ignite, CACHE_NAME_DHT_PARTITIONED, 0, 0);
+                    }
+                    catch (IgniteCheckedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+
+        t1.start();
+        t2.start();
+
+        startGrid(2);
+        startGrid(3);
+
+        stopGrid(2);
+
+        startGrid(4);
+
+        waitForRebalancing(3, 6);
+        waitForRebalancing(4, 6);
+
+        concurrentStartFinished = true;
+
+        awaitPartitionMapExchange(true);
+
+        checkSupplyContextMapIsEmpty();
+
+        t1.join();
+        t2.join();
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        info("Time to rebalance entries: " + spend);
+    }
+
+    /**
+     * @param id Node id.
+     * @param major Major ver.
+     * @param minor Minor ver.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
+    }
+
+    /**
+     * @param id Node id.
+     * @param major Major ver.
+     * @throws IgniteCheckedException Exception.
+     */
+    protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
+        waitForRebalancing(id, new AffinityTopologyVersion(major));
+    }
+
+    /**
+     * @param id Node id.
+     * @param top Topology version.
+     * @throws IgniteCheckedException
+     */
+    protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
+        boolean finished = false;
+
+        while (!finished) {
+            finished = true;
+
+            for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
+                GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();
+                if (fut.topologyVersion() == null || !fut.topologyVersion().equals(top)) {
+                    finished = false;
+
+                    break;
+                }
+                else if (!fut.get()) {
+                    finished = false;
+
+                    log.warning("Rebalancing finished with missed partitions.");
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    protected void checkSupplyContextMapIsEmpty() {
+        for (Ignite g : G.allGrids()) {
+            for (GridCacheAdapter c : ((IgniteEx)g).context().cache().internalCaches()) {
+
+                Object supplier = U.field(c.preloader(), "supplier");
+
+                Map map = U.field(supplier, "scMap");
+
+                synchronized (map) {
+                    assert map.isEmpty();
+                }
+            }
+        }
+    }
+
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void testComplexRebalancing() throws Exception {
+        final Ignite ignite = startGrid(0);
+
+        generateData(ignite, 0, 0);
+
+        log.info("Preloading started.");
+
+        long start = System.currentTimeMillis();
+
+        concurrentStartFinished = false;
+        concurrentStartFinished2 = false;
+        concurrentStartFinished3 = false;
+
+        Thread t1 = new Thread() {
+            @Override public void run() {
+                try {
+                    startGrid(1);
+                    startGrid(2);
+
+                    while (!concurrentStartFinished2) {
+                        U.sleep(10);
+                    }
+
+                    waitForRebalancing(0, 5, 0);
+                    waitForRebalancing(1, 5, 0);
+                    waitForRebalancing(2, 5, 0);
+                    waitForRebalancing(3, 5, 0);
+                    waitForRebalancing(4, 5, 0);
+
+                    //New cache should start rebalancing.
+                    CacheConfiguration<Integer, Integer> cacheRCfg = new CacheConfiguration<>();
+
+                    cacheRCfg.setName(CACHE_NAME_DHT_PARTITIONED + "_NEW");
+                    cacheRCfg.setCacheMode(CacheMode.PARTITIONED);
+                    cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
+                    grid(0).getOrCreateCache(cacheRCfg);
+
+                    while (!concurrentStartFinished3) {
+                        U.sleep(10);
+                    }
+
+                    concurrentStartFinished = true;
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        Thread t2 = new Thread() {
+            @Override public void run() {
+                try {
+                    startGrid(3);
+                    startGrid(4);
+
+                    concurrentStartFinished2 = true;
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        Thread t3 = new Thread() {
+            @Override public void run() {
+                generateData(ignite, 0, 1);
+
+                concurrentStartFinished3 = true;
+            }
+        };
+
+        t1.start();
+        t2.start();// Should cancel t1 rebalancing.
+        t3.start();
+
+        t1.join();
+        t2.join();
+        t3.join();
+
+        waitForRebalancing(0, 5, 1);
+        waitForRebalancing(1, 5, 1);
+        waitForRebalancing(2, 5, 1);
+        waitForRebalancing(3, 5, 1);
+        waitForRebalancing(4, 5, 1);
+
+        awaitPartitionMapExchange(true);
+
+        checkSupplyContextMapIsEmpty();
+
+        checkData(grid(4), 0, 1);
+
+        final Ignite ignite3 = grid(3);
+
+        Thread t4 = new Thread() {
+            @Override public void run() {
+                generateData(ignite3, 0, 2);
+
+            }
+        };
+
+        t4.start();
+
+        stopGrid(1);
+
+        waitForRebalancing(0, 6);
+        waitForRebalancing(2, 6);
+        waitForRebalancing(3, 6);
+        waitForRebalancing(4, 6);
+
+        awaitPartitionMapExchange(true);
+
+        checkSupplyContextMapIsEmpty();
+
+        stopGrid(0);
+
+        waitForRebalancing(2, 7);
+        waitForRebalancing(3, 7);
+        waitForRebalancing(4, 7);
+
+        awaitPartitionMapExchange(true);
+
+        checkSupplyContextMapIsEmpty();
+
+        stopGrid(2);
+
+        waitForRebalancing(3, 8);
+        waitForRebalancing(4, 8);
+
+        awaitPartitionMapExchange(true);
+
+        checkSupplyContextMapIsEmpty();
+
+        t4.join();
+
+        stopGrid(3);
+
+        waitForRebalancing(4, 9);
+
+        checkSupplyContextMapIsEmpty();
+
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
+        checkData(grid(4), 0, 2);
+
+        log.info("Spend " + spend + " seconds to rebalance entries.");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
new file mode 100644
index 0000000..831e82d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingUnmarshallingFailedSelfTest.java
@@ -0,0 +1,147 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class GridCacheRebalancingUnmarshallingFailedSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** partitioned cache name. */
+    protected static String CACHE = "cache";
+
+    /** Allows to change behavior of readExternal method. */
+    protected static AtomicInteger readCnt = new AtomicInteger();
+
+    /** Test key 1. */
+    private static class TestKey implements Externalizable {
+        /** Field. */
+        @QuerySqlField(index = true)
+        private String field;
+
+        /**
+         * @param field Test key 1.
+         */
+        public TestKey(String field) {
+            this.field = field;
+        }
+
+        /** Test key 1. */
+        public TestKey() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey key = (TestKey)o;
+
+            return !(field != null ? !field.equals(key.field) : key.field != null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return field != null ? field.hashCode() : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(field);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            field = (String)in.readObject();
+
+            if (readCnt.decrementAndGet() <= 0)
+                throw new IOException("Class can not be unmarshalled.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration iCfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<TestKey, Integer> cfg = new CacheConfiguration<>();
+
+        cfg.setName(CACHE);
+        cfg.setCacheMode(CacheMode.PARTITIONED);
+        cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cfg.setBackups(0);
+
+        iCfg.setCacheConfiguration(cfg);
+
+        return iCfg;
+    }
+
+    /**
+     * @throws Exception e.
+     */
+    public void test() throws Exception {
+        readCnt.set(Integer.MAX_VALUE);
+
+        startGrid(0);
+
+        for (int i = 0; i < 100; i++) {
+            grid(0).cache(CACHE).put(new TestKey(String.valueOf(i)), i);
+        }
+
+        readCnt.set(1);
+
+        startGrid(1);
+
+        readCnt.set(Integer.MAX_VALUE);
+
+        for (int i = 0; i < 50; i++) {
+            assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) != null;
+        }
+
+        stopGrid(0);
+
+        for (int i = 50; i < 100; i++) {
+            assert grid(1).cache(CACHE).get(new TestKey(String.valueOf(i))) == null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index c4ad169..64f1495 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -142,26 +142,6 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testSingleZeroPoolSize() throws Exception {
-        preloadMode = SYNC;
-        poolSize = 0;
-
-        try {
-            startGrid(1);
-
-            assert false : "Grid should have been failed to start.";
-        }
-        catch (IgniteCheckedException e) {
-            info("Caught expected exception: " + e);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
      * @throws Exception If test failed.
      */
     public void testIntegrity() throws Exception {
@@ -602,4 +582,4 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
             // No-op.
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0280e9c..51d8a2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -511,23 +511,6 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
-     */
-    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
-        /** */
-        private boolean ignorePingResponse;
-
-        /** {@inheritDoc} */
-        protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
-            IgniteCheckedException {
-            if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
-                return;
-            else
-                super.writeToSocket(sock, msg, timeout);
-        }
-    }
-
-    /**
      * @throws Exception If any error occurs.
      */
     public void testNodeAdded() throws Exception {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
new file mode 100644
index 0000000..dbc54bc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
+
+/**
+ *
+ */
+public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    /** */
+    public boolean ignorePingResponse;
+
+    /** {@inheritDoc} */
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+        IgniteCheckedException {
+        if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+            return;
+        else
+            super.writeToSocket(sock, msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void simulateNodeFailure() {
+        super.simulateNodeFailure();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d133a84..41d4b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -1228,7 +1229,7 @@ public abstract class GridAbstractTest extends TestCase {
 
         cfg.setCommunicationSpi(commSpi);
 
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        TcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
 
         if (isDebug()) {
             discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 28d5c73..71f3ee3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -63,6 +63,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
@@ -414,6 +416,15 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      */
     @SuppressWarnings("BusyWait")
     protected void awaitPartitionMapExchange() throws InterruptedException {
+        awaitPartitionMapExchange(false);
+    }
+
+    /**
+     * @param waitEvicts If {@code true} will wait for evictions finished.
+     * @throws InterruptedException If interrupted.
+     */
+    @SuppressWarnings("BusyWait")
+    protected void awaitPartitionMapExchange(boolean waitEvicts) throws InterruptedException {
         for (Ignite g : G.allGrids()) {
             IgniteKernal g0 = (IgniteKernal)g;
 
@@ -451,7 +462,10 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
 
                                 int actual = owners.size();
 
-                                if (affNodes.size() != owners.size() || !affNodes.containsAll(owners)) {
+                                GridDhtLocalPartition loc = top.localPartition(p, readyVer, false);
+
+                                if (affNodes.size() != owners.size() || !affNodes.containsAll(owners) ||
+                                    (waitEvicts && loc != null && loc.state() == GridDhtPartitionState.RENTING)) {
                                     LT.warn(log(), null, "Waiting for topology map update [" +
                                         "grid=" + g.name() +
                                         ", cache=" + cfg.getName() +
@@ -484,7 +498,9 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                 if (i == 0)
                                     start = System.currentTimeMillis();
 
-                                if (System.currentTimeMillis() - start > 30_000)
+                                if (System.currentTimeMillis() - start > 30_000) {
+                                    U.dumpThreads(log);
+
                                     throw new IgniteException("Timeout of waiting for topology map update [" +
                                         "grid=" + g.name() +
                                         ", cache=" + cfg.getName() +
@@ -493,6 +509,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                         ", p=" + p +
                                         ", readVer=" + readyVer +
                                         ", locNode=" + g.cluster().localNode() + ']');
+                                }
 
                                 Thread.sleep(200); // Busy wait.
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 796c531..c3c3659 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -49,6 +49,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNea
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest;
@@ -135,6 +137,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
         suite.addTestSuite(IgniteTxReentryColocatedSelfTest.class);
 
         suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+        suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
 
         // Test for byte array value special case.
         suite.addTestSuite(GridCacheLocalByteArrayValuesSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
index 0226046..582bfe3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/spi/communication/tcp/GridOrderedMessageCancelSelfTest.java
@@ -107,25 +107,33 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testTask() throws Exception {
+        Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+        int initSize =  map.size();
+
         ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), Task.class, null);
 
-        testMessageSet(fut);
+        testMessageSet(fut, initSize, map);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTaskException() throws Exception {
+        Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
+
+        int initSize =  map.size();
+
         ComputeTaskFuture<?> fut = executeAsync(compute(grid(0).cluster().forRemotes()), FailTask.class, null);
 
-        testMessageSet(fut);
+        testMessageSet(fut, initSize, map);
     }
 
     /**
      * @param fut Future to cancel.
      * @throws Exception If failed.
      */
-    private void testMessageSet(IgniteFuture<?> fut) throws Exception {
+    private void testMessageSet(IgniteFuture<?> fut, int initSize, Map map) throws Exception {
         cancelLatch.await();
 
         assertTrue(fut.cancel());
@@ -134,11 +142,9 @@ public class GridOrderedMessageCancelSelfTest extends GridCommonAbstractTest {
 
         assertTrue(U.await(finishLatch, 5000, MILLISECONDS));
 
-        Map map = U.field(((IgniteKernal)grid(0)).context().io(), "msgSetMap");
-
         info("Map: " + map);
 
-        assertTrue(map.isEmpty());
+        assertEquals(map.size(), initSize);
     }
 
     /**


Mime
View raw message