ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [21/50] ignite git commit: IGNITE-5701 - Some nodes have partitionUpdateCounter equal to 0 after rebalancing
Date Tue, 11 Jul 2017 09:27:20 GMT
IGNITE-5701 - Some nodes have partitionUpdateCounter equal to 0 after rebalancing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/993f7fbe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/993f7fbe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/993f7fbe

Branch: refs/heads/master
Commit: 993f7fbe1d49a524e2dee626aef72e16fd5d3cda
Parents: 517a23d
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Fri Jul 7 18:55:27 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Fri Jul 7 18:55:41 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  2 +-
 .../distributed/dht/GridDhtCacheEntry.java      |  6 --
 .../distributed/dht/GridDhtLocalPartition.java  | 45 ------------
 .../dht/GridDhtPartitionTopologyImpl.java       | 16 ++++-
 .../dht/preloader/GridDhtPartitionDemander.java |  5 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +-
 .../GridDhtPartitionSupplyMessage.java          | 20 +++---
 .../GridCacheDatabaseSharedManager.java         |  2 +-
 .../IgnitePdsCacheRebalancingAbstractTest.java  | 74 ++++++++++++++++++++
 .../wal/IgniteWalHistoryReservationsTest.java   | 29 ++++++--
 10 files changed, 131 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 085712a..35b0577 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -626,7 +626,7 @@ public final class IgniteSystemProperties {
     /**
      * WAL rebalance threshold.
      */
-     public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD";
+    public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD";
 
     /** Ignite page memory concurrency level. */
     public static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL";

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 57dd622..77cc642 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -123,12 +123,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected void onUpdateFinished(long cntr) {
-        if (cctx.shared().database().persistenceEnabled())
-            locPart.onUpdateReceived(cntr);
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean isDht() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 8e42351..725822d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -157,13 +157,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
     @GridToStringExclude
     private final CacheDataStore store;
 
-    /** Partition updates. */
-    @GridToStringExclude
-    private final ConcurrentNavigableMap<Long, Boolean> updates = new ConcurrentSkipListMap<>();
-
-    /** Last applied update. */
-    private final AtomicLong lastApplied = new AtomicLong(0);
-
     /** Set if failed to move partition to RENTING state due to reservations, to be checked
when
      * reservation is released. */
     private volatile boolean shouldBeRenting;
@@ -349,44 +342,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
     }
 
     /**
-     * @return Last applied update.
-     */
-    public long lastAppliedUpdate() {
-        return lastApplied.get();
-    }
-
-    /**
-     * @param cntr Received counter.
-     */
-    public void onUpdateReceived(long cntr) {
-        boolean changed = updates.putIfAbsent(cntr, true) == null;
-
-        if (!changed)
-            return;
-
-        while (true) {
-            Map.Entry<Long, Boolean> entry = updates.firstEntry();
-
-            if (entry == null)
-                return;
-
-            long first = entry.getKey();
-
-            long cntr0 = lastApplied.get();
-
-            if (first <= cntr0)
-                updates.remove(first);
-            else if (first == cntr0 + 1)
-                if (lastApplied.compareAndSet(cntr0, first))
-                    updates.remove(first);
-                else
-                    break;
-            else
-                break;
-        }
-    }
-
-    /**
      * @return If partition is moving or owning or renting.
      */
     public boolean valid() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index cf0dd5f..2f54810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -673,6 +673,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         if (loc == null || loc.state() == EVICTED) {
             locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
+            T2<Long, Long> cntr = cntrMap.get(p);
+
+            if (cntr != null)
+                loc.updateCounter(cntr.get2());
+
             if (ctx.pageStore() != null) {
                 try {
                     ctx.pageStore().onPartitionCreated(grp.groupId(), p);
@@ -1334,11 +1339,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                 if (cntr != null && cntr.get2() > part.updateCounter())
                     part.updateCounter(cntr.get2());
+                else if (part.updateCounter() > 0)
+                    this.cntrMap.put(part.id(), new T2<>(part.initialUpdateCounter(),
part.updateCounter()));
             }
         }
         finally {
             lock.writeLock().unlock();
-
         }
     }
 
@@ -1715,6 +1721,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         result.add(ctx.localNodeId());
                     }
 
+                    U.warn(log, "Partition has been scheduled for rebalancing due to outdated
update counter " +
+                        "[nodeId=" + ctx.localNodeId() + "cacheOrGroupName=" + grp.cacheOrGroupName()
+
+                        ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]");
+
                 }
             }
 
@@ -1731,6 +1741,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         result.add(e.getKey());
                     }
                 }
+
+                U.warn(log, "Partition has been scheduled for rebalancing due to outdated
update counter " +
+                    "[nodeId=" + ctx.localNodeId() + "cacheOrGroupName=" + grp.cacheOrGroupName()
+
+                    ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]");
             }
 
             if (updateSeq)

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e7e95b2..4f34aba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -640,7 +640,7 @@ public class GridDhtPartitionDemander {
 
                     assert part != null;
 
-                    boolean last = supply.last().contains(p);
+                    boolean last = supply.last().containsKey(p);
 
                     if (part.state() == MOVING) {
                         boolean reserved = part.reserve();
@@ -680,6 +680,9 @@ public class GridDhtPartitionDemander {
                             // If message was last for this partition,
                             // then we take ownership.
                             if (last) {
+                                if (supply.isClean(p))
+                                    part.updateCounter(supply.last().get(p));
+
                                 top.own(part);
 
                                 fut.partitionDone(id, p);

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 1cc6c28..3ead982 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -414,7 +414,7 @@ class GridDhtPartitionSupplier {
                     }
 
                     // Mark as last supply message.
-                    s.last(part);
+                    s.last(part, loc.updateCounter());
 
                     phase = SupplyContextPhase.NEW;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index ef14a90..90d11f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -56,8 +56,8 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
     private AffinityTopologyVersion topVer;
 
     /** Partitions that have been fully sent. */
-    @GridDirectCollection(int.class)
-    private Collection<Integer> last;
+    @GridDirectMap(keyType = int.class, valueType = long.class)
+    private Map<Integer, Long> last;
 
     /** Partitions which were not found. */
     @GridToStringInclude
@@ -128,19 +128,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
     /**
      * @return Flag to indicate last message for partition.
      */
-    Collection<Integer> last() {
-        return last == null ? Collections.<Integer>emptySet() : last;
+    Map<Integer, Long> last() {
+        return last == null ? Collections.<Integer, Long>emptyMap() : last;
     }
 
     /**
      * @param p Partition which was fully sent.
      */
-    void last(int p) {
+    void last(int p, long cntr) {
         if (last == null)
-            last = new HashSet<>();
+            last = new HashMap<>();
 
-        if (last.add(p)) {
-            msgSize += 4;
+        if (last.put(p, cntr) == null) {
+            msgSize += 12;
 
             // If partition is empty, we need to add it.
             if (!infos().containsKey(p)) {
@@ -304,7 +304,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                if (!writer.writeMap("last", last, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
                     return false;
 
                 writer.incrementState();
@@ -382,7 +382,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 reader.incrementState();
 
             case 7:
-                last = reader.readCollection("last", MessageCollectionItemType.INT);
+                last = reader.readMap("last", MessageCollectionItemType.INT, MessageCollectionItemType.LONG,
false);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4af4daf..d64677e 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2126,7 +2126,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     CacheState state = new CacheState(locParts.size());
 
                     for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
-                        state.addPartitionState(part.id(), part.dataStore().fullSize(), part.lastAppliedUpdate());
+                        state.addPartitionState(part.id(), part.dataStore().fullSize(), part.updateCounter());
 
                     cpRec.addCacheGroupState(grp.groupId(), state);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index cbc2623..588b3ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cache.QueryEntity;
@@ -41,11 +43,13 @@ import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 
@@ -489,6 +493,76 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
     }
 
     /**
+     * @throws Exception If failed
+     */
+    public void testPartitionCounterConsistencyOnUnstableTopology() throws Exception {
+        final Ignite ig = startGrids(4);
+
+        ig.active(true);
+
+        int k = 0;
+
+        try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) {
+            ds.allowOverwrite(true);
+
+            for (int k0 = k; k < k0 + 10_000; k++)
+                ds.addData(k, k);
+        }
+
+        for (int t = 0; t < 10; t++) {
+            IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        stopGrid(3);
+
+                        IgniteEx ig0 = startGrid(3);
+
+                        awaitPartitionMapExchange();
+
+                        ig0.cache(cacheName).rebalance().get();
+                    }
+                    catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) {
+                ds.allowOverwrite(true);
+
+                while (!fut.isDone()) {
+                    ds.addData(k, k);
+
+                    k++;
+
+                    U.sleep(1);
+                }
+            }
+
+            fut.get();
+
+            Map<Integer, Long> cntrs = new HashMap<>();
+
+            for (int g = 0; g < 4; g++) {
+                IgniteEx ig0 = grid(g);
+
+                for (GridDhtLocalPartition part : ig0.cachex(cacheName).context().topology().currentLocalPartitions())
{
+                    if (cntrs.containsKey(part.id()))
+                        assertEquals(String.valueOf(part.id()), (long) cntrs.get(part.id()),
part.updateCounter());
+                    else
+                        cntrs.put(part.id(), part.updateCounter());
+                }
+
+                for (int k0 = 0; k0 < k; k0++) {
+                    assertEquals(String.valueOf(k0), k0, ig0.cache(cacheName).get(k0));
+                }
+            }
+
+            assertEquals(ig.affinity(cacheName).partitions(), cntrs.size());
+        }
+    }
+
+    /**
      *
      */
     private static class TestValue implements Serializable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/993f7fbe/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index 48d8c21..4bea63f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.MemoryPolicyConfiguration;
 import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -201,7 +202,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest
{
 
         int entryCnt = 10_000;
 
-        Ignite ig0 = startGrids(2);
+        IgniteEx ig0 = (IgniteEx) startGrids(2);
 
         ig0.active(true);
 
@@ -219,7 +220,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest
{
 
         forceCheckpoint();
 
-        Ignite ig1 = startGrid(1);
+        IgniteEx ig1 = startGrid(1);
 
         IgniteCache<Integer, Integer> cache1 = ig1.cache("cache1");
 
@@ -236,6 +237,16 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest
{
                 assertEquals("k=" + k, k, cache1.get(k));
             }
         }
+
+        cache.rebalance().get();
+
+        for (int p = 0; p < ig1.affinity("cache1").partitions(); p++) {
+            GridDhtLocalPartition p0 = ig0.context().cache().cache("cache1").context().topology().localPartition(p);
+            GridDhtLocalPartition p1 = ig1.context().cache().cache("cache1").context().topology().localPartition(p);
+
+            assertTrue(p0.updateCounter() > 0);
+            assertEquals(p0.updateCounter(), p1.updateCounter());
+        }
     }
 
     /**
@@ -244,7 +255,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest
{
     public void testNodeIsClearedIfHistoryIsUnavailable() throws Exception {
         int entryCnt = 10_000;
 
-        Ignite ig0 = startGrids(2);
+        IgniteEx ig0 = (IgniteEx) startGrids(2);
 
         ig0.active(true);
 
@@ -269,7 +280,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest
{
                 assertEquals("k=" + k, k, cache.get(k));
         }
 
-        Ignite ig1 = startGrid(1);
+        IgniteEx ig1 = startGrid(1);
 
         IgniteCache<Integer, Integer> cache1 = ig1.cache("cache1");
 
@@ -286,6 +297,16 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest
{
                 assertEquals("k=" + k, k, cache1.get(k));
             }
         }
+
+        cache.rebalance().get();
+
+        for (int p = 0; p < ig1.affinity("cache1").partitions(); p++) {
+            GridDhtLocalPartition p0 = ig0.context().cache().cache("cache1").context().topology().localPartition(p);
+            GridDhtLocalPartition p1 = ig1.context().cache().cache("cache1").context().topology().localPartition(p);
+
+            assertTrue(p0.updateCounter() > 0);
+            assertEquals(p0.updateCounter(), p1.updateCounter());
+        }
     }
 
     /**


Mime
View raw message