ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [1/2] ignite git commit: IGNITE-5872 - Replace counters map with arrays
Date Mon, 07 Aug 2017 12:48:23 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5872 [created] 37e7aca66


IGNITE-5872 - Replace counters map with arrays


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a5a4778
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a5a4778
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a5a4778

Branch: refs/heads/ignite-5872
Commit: 5a5a47788ff1a2f16e0512f435ad3eba1fcb72c6
Parents: d2cb2f7
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Mon Aug 7 14:05:28 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Mon Aug 7 14:05:28 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   2 +-
 .../GridCachePartitionExchangeManager.java      |  12 +-
 .../cache/IgniteCacheOffheapManager.java        |   2 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |   7 +-
 .../dht/GridClientPartitionTopology.java        |  55 +++----
 .../dht/GridDhtPartitionTopology.java           |  25 +++-
 .../dht/GridDhtPartitionTopologyImpl.java       | 143 ++++++++++--------
 .../CachePartitionFullCountersMap.java          | 105 ++++++++++++++
 .../CachePartitionPartialCountersMap.java       | 145 +++++++++++++++++++
 .../GridDhtPartitionsAbstractMessage.java       |   8 -
 .../GridDhtPartitionsExchangeFuture.java        |  26 ++--
 .../preloader/GridDhtPartitionsFullMessage.java |  13 +-
 .../GridDhtPartitionsSingleMessage.java         |  21 +--
 .../GridDhtPartitionsSingleRequest.java         |   8 -
 .../IgniteDhtPartitionCountersMap.java          |  14 +-
 .../persistence/GridCacheOffheapManager.java    |   2 +-
 .../continuous/GridContinuousProcessor.java     |   7 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |  10 +-
 18 files changed, 434 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 5d573b2..9762586 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -441,7 +441,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             if (clientTop != null) {
                                 grp.topology().update(topVer,
                                     clientTop.partitionMap(true),
-                                    clientTop.updateCounters(false),
+                                    clientTop.fullUpdateCounters(),
                                     Collections.<Integer>emptySet(),
                                     null);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d67d81d..e18eb55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -962,8 +962,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      *     finishUnmarshall methods are called).
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
-     * @param partHistSuppliers
-     * @param partsToReload
+     * @param partHistSuppliers Partition history suppliers map.
+     * @param partsToReload Partitions to reload map.
      * @return Message.
      */
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
@@ -1007,7 +1007,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
 
                 if (exchId != null)
-                    m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
+                    m.addPartitionUpdateCounters(grp.groupId(), grp.topology().fullUpdateCounters());
             }
         }
 
@@ -1025,7 +1025,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
 
             if (exchId != null)
-                m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true));
+                m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters());
         }
 
         return m;
@@ -1128,7 +1128,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     grp.affinity().similarAffinityKey());
 
                 if (sndCounters)
-                    m.partitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
+                    m.partitionUpdateCounters(grp.groupId(), grp.topology().localUpdateCounters());
             }
         }
 
@@ -1146,7 +1146,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 top.similarAffinityKey());
 
             if (sndCounters)
-                m.partitionUpdateCounters(top.groupId(), top.updateCounters(true));
+                m.partitionUpdateCounters(top.groupId(), top.localUpdateCounters());
         }
 
         return m;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 001848e..4531802 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -414,7 +414,7 @@ public interface IgniteCacheOffheapManager {
         /**
          * @return Initial update counter.
          */
-        public Long initialUpdateCounter();
+        public long initialUpdateCounter();
 
         /**
          * @param cctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index ba6c89d..9e48d45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1054,8 +1054,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** */
         private final ConcurrentMap<Integer, AtomicLong> cacheSizes = new ConcurrentHashMap<>();
 
-        /** Initialized update counter. */
-        protected Long initCntr = 0L;
+        /** Initial update counter. */
+        protected long initCntr;
 
         /**
          * @param partId Partition number.
@@ -1600,7 +1600,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public Long initialUpdateCounter() {
+        @Override public long initialUpdateCounter() {
             return initCntr;
         }
 
@@ -1629,7 +1629,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
          * @param key Key.
          * @param oldVal Old value.
          * @param newVal New value.
-         * @throws IgniteCheckedException If failed.
          */
         private void updateIgfsMetrics(
             GridCacheContext cctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 4b9826e..cf45120 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -46,7 +48,6 @@ import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -104,7 +105,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
     /** Partition update counters. */
-    private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
+    private CachePartitionFullCountersMap cntrMap = new CachePartitionFullCountersMap();
 
     /** */
     private final Object similarAffKey;
@@ -593,7 +594,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     @Override public boolean update(
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
-        Map<Integer, T2<Long, Long>> cntrMap,
+        @Nullable CachePartitionFullCountersMap cntrMap,
         Set<Integer> partsToReload,
         @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
@@ -696,7 +697,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             }
 
             if (cntrMap != null)
-                this.cntrMap = new HashMap<>(cntrMap);
+                this.cntrMap = new CachePartitionFullCountersMap(cntrMap);
 
             consistencyCheck();
 
@@ -711,17 +712,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+    @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
         assert cntrMap != null;
 
         lock.writeLock().lock();
 
         try {
-            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+            for (int i = 0; i < cntrMap.size(); i++) {
+                int pId = cntrMap.partitionAt(i);
 
-                if (cntr == null || cntr.get2() < e.getValue().get2())
-                    this.cntrMap.put(e.getKey(), e.getValue());
+                long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
+                long updateCntr = cntrMap.updateCounterAt(i);
+
+                if (this.cntrMap.updateCounter(pId) < updateCntr) {
+                    this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
+                    this.cntrMap.updateCounter(pId, updateCntr);
+                }
             }
         }
         finally {
@@ -729,6 +735,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void applyUpdateCounters() {
+        // No-op on client topology.
+    }
+
     /**
      * Method checks is new partition map more stale than current partition map
      * New partition map is stale if topology version or update sequence are less than of current map
@@ -1043,33 +1054,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
+    @Override public CachePartitionFullCountersMap fullUpdateCounters() {
         lock.readLock().lock();
 
         try {
-            if (skipZeros) {
-                Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrMap.size());
-
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                    T2<Long, Long> val = e.getValue();
-
-                    if (val.get1() == 0L && val.get2() == 0L)
-                        continue;
-
-                    res.put(e.getKey(), e.getValue());
-                }
-
-                return res;
-            }
-            else
-                return new HashMap<>(cntrMap);
-}
+            return new CachePartitionFullCountersMap(cntrMap);
+        }
         finally {
             lock.readLock().unlock();
         }
     }
 
     /** {@inheritDoc} */
+    @Override public CachePartitionPartialCountersMap localUpdateCounters() {
+        return CachePartitionPartialCountersMap.EMPTY;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
         assert false : "Should not be called on non-affinity node";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 8688c4f..bab9030 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -29,12 +28,13 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -249,7 +249,7 @@ public interface GridDhtPartitionTopology {
      *      means full map received is not related to exchange
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
-     * @param partsToReload
+     * @param partsToReload Set of partitions that need to be reloaded.
      * @param msgTopVer Topology version from incoming message. This value is not null only for case message is not
      *      related to exchange. Value should be not less than previous 'Topology version from exchange'.
      * @return {@code True} if local state was changed.
@@ -257,7 +257,7 @@ public interface GridDhtPartitionTopology {
     public boolean update(
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
-        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
+        @Nullable CachePartitionFullCountersMap cntrMap,
         Set<Integer> partsToReload,
         @Nullable AffinityTopologyVersion msgTopVer);
 
@@ -270,9 +270,16 @@ public interface GridDhtPartitionTopology {
         GridDhtPartitionMap parts);
 
     /**
+     * Collects update counters collected during exchange. Called on coordinator.
+     *
      * @param cntrMap Counters map.
      */
-    public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);
+    public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap);
+
+    /**
+     * Applies update counters collected during exchange on coordinator. Called on coordinator.
+     */
+    public void applyUpdateCounters();
 
     /**
      * Checks if there is at least one owner for each partition in the cache topology.
@@ -296,10 +303,14 @@ public interface GridDhtPartitionTopology {
     public Collection<Integer> lostPartitions();
 
     /**
-     * @param skipZeros If {@code true} then filters out zero counters.
      * @return Partition update counters.
      */
-    public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros);
+    public CachePartitionFullCountersMap fullUpdateCounters();
+
+    /**
+     * @return Partition update counters.
+     */
+    public CachePartitionPartialCountersMap localUpdateCounters();
 
     /**
      * @param part Partition to own.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index d7a224c..ff6bf7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -52,7 +54,6 @@ import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -130,7 +131,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
 
     /** Partition update counter. */
-    private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
+    private final CachePartitionFullCountersMap cntrMap;
 
     /** */
     private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
@@ -139,8 +140,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param ctx Cache shared context.
      * @param grp Cache group.
      */
-    public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx,
-        CacheGroupContext grp) {
+    public GridDhtPartitionTopologyImpl(
+        GridCacheSharedContext ctx,
+        CacheGroupContext grp
+    ) {
         assert ctx != null;
         assert grp != null;
 
@@ -152,6 +155,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG);
 
         locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
+
+        cntrMap = new CachePartitionFullCountersMap(locParts.length());
     }
 
     /** {@inheritDoc} */
@@ -662,10 +667,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (loc == null || loc.state() == EVICTED) {
             locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p));
 
-            T2<Long, Long> cntr = cntrMap.get(p);
+            long updCntr = cntrMap.updateCounter(p);
 
-            if (cntr != null)
-                loc.updateCounter(cntr.get2());
+            if (updCntr != 0)
+                loc.updateCounter(updCntr);
 
             if (ctx.pageStore() != null) {
                 try {
@@ -1102,7 +1107,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     @Override public boolean update(
         @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
-        @Nullable Map<Integer, T2<Long, Long>> incomeCntrMap,
+        @Nullable CachePartitionFullCountersMap incomeCntrMap,
         Set<Integer> partsToReload,
         @Nullable AffinityTopologyVersion msgTopVer) {
         if (log.isDebugEnabled())
@@ -1117,14 +1122,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 return false;
 
             if (incomeCntrMap != null) {
-                // update local map partition counters
-                for (Map.Entry<Integer, T2<Long, Long>> e : incomeCntrMap.entrySet()) {
-                    T2<Long, Long> existCntr = this.cntrMap.get(e.getKey());
-
-                    if (existCntr == null || existCntr.get2() < e.getValue().get2())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
-
                 // update local counters in partitions
                 for (int i = 0; i < locParts.length(); i++) {
                     GridDhtLocalPartition part = locParts.get(i);
@@ -1132,10 +1129,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (part == null)
                         continue;
 
-                    T2<Long, Long> cntr = incomeCntrMap.get(part.id());
+                    if (part.state() == OWNING || part.state() == MOVING) {
+                        long updCntr = incomeCntrMap.updateCounter(part.id());
 
-                    if (cntr != null)
-                        part.updateCounter(cntr.get2());
+                        if (updCntr != 0 && updCntr > part.updateCounter())
+                            part.updateCounter(updCntr);
+                    }
                 }
             }
 
@@ -1255,13 +1254,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                         assert locPart != null;
 
-                        if (incomeCntrMap != null) {
-                            T2<Long, Long> cntr = incomeCntrMap.get(p);
-
-                            if (cntr != null && cntr.get2() > locPart.updateCounter())
-                                locPart.updateCounter(cntr.get2());
-                        }
-
                         if (locPart.state() == MOVING) {
                             boolean success = locPart.own();
 
@@ -1281,13 +1273,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                             changed = true;
                         }
-
-                        if (incomeCntrMap != null) {
-                            T2<Long, Long> cntr = incomeCntrMap.get(p);
-
-                            if (cntr != null && cntr.get2() > locPart.updateCounter())
-                                locPart.updateCounter(cntr.get2());
-                        }
                     }
                     else if (state == RENTING && partsToReload.contains(p)) {
                         GridDhtLocalPartition locPart = locParts.get(p);
@@ -1337,7 +1322,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+    @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
         assert cntrMap != null;
 
         long now = U.currentTimeMillis();
@@ -1356,25 +1341,55 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return;
 
-            for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+            for (int i = 0; i < cntrMap.size(); i++) {
+                int pId = cntrMap.partitionAt(i);
+
+                long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
+                long updateCntr = cntrMap.updateCounterAt(i);
+
+                if (this.cntrMap.updateCounter(pId) < updateCntr) {
+                    this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
+                    this.cntrMap.updateCounter(pId, updateCntr);
+                }
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void applyUpdateCounters() {
+        long now = U.currentTimeMillis();
+
+        lock.writeLock().lock();
+
+        try {
+            long acquired = U.currentTimeMillis();
 
-                if (cntr == null || cntr.get2() < e.getValue().get2())
-                    this.cntrMap.put(e.getKey(), e.getValue());
+            if (acquired - now >= 100) {
+                if (timeLog.isInfoEnabled())
+                    timeLog.info("Waited too long to acquire topology write lock " +
+                        "[cache=" + grp.groupId() + ", waitTime=" + (acquired - now) + ']');
             }
 
+            if (stopping)
+                return;
+
             for (int i = 0; i < locParts.length(); i++) {
                 GridDhtLocalPartition part = locParts.get(i);
 
                 if (part == null)
                     continue;
 
-                T2<Long, Long> cntr = cntrMap.get(part.id());
+                long updCntr = cntrMap.updateCounter(part.id());
 
-                if (cntr != null && cntr.get2() > part.updateCounter())
-                    part.updateCounter(cntr.get2());
-                else if (part.updateCounter() > 0)
-                    this.cntrMap.put(part.id(), new T2<>(part.initialUpdateCounter(), part.updateCounter()));
+                if (updCntr > part.updateCounter())
+                    part.updateCounter(updCntr);
+                else if (part.updateCounter() > 0) {
+                    cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter());
+                    cntrMap.updateCounter(part.id(), part.updateCounter());
+                }
             }
         }
         finally {
@@ -2046,26 +2061,32 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
+    @Override public CachePartitionFullCountersMap fullUpdateCounters() {
         lock.readLock().lock();
 
         try {
-            Map<Integer, T2<Long, Long>> res;
+            return new CachePartitionFullCountersMap(cntrMap);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
 
-            if (skipZeros) {
-                res = U.newHashMap(cntrMap.size());
+    /** {@inheritDoc} */
+    @Override public CachePartitionPartialCountersMap localUpdateCounters() {
+        lock.readLock().lock();
 
-                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
-                    Long cntr = e.getValue().get2();
+        try {
+            int locPartCnt = 0;
 
-                    if (ZERO.equals(cntr))
-                        continue;
+            for (int i = 0; i < locParts.length(); i++) {
+                GridDhtLocalPartition part = locParts.get(i);
 
-                    res.put(e.getKey(), e.getValue());
-                }
+                if (part != null)
+                    locPartCnt++;
             }
-            else
-                res = new HashMap<>(cntrMap);
+
+            CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(locPartCnt);
 
             for (int i = 0; i < locParts.length(); i++) {
                 GridDhtLocalPartition part = locParts.get(i);
@@ -2073,15 +2094,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
-                T2<Long, Long> cntr0 = res.get(part.id());
-                Long initCntr = part.initialUpdateCounter();
+                long updCntr = part.updateCounter();
+                long initCntr = part.initialUpdateCounter();
 
-                if (cntr0 == null || initCntr >= cntr0.get1()) {
-                    if (skipZeros && initCntr == 0L && part.updateCounter() == 0L)
-                        continue;
+                if (initCntr == 0L && updCntr == 0L)
+                    continue;
 
-                    res.put(part.id(), new T2<>(initCntr, part.updateCounter()));
-                }
+                res.add(part.id(), initCntr, updCntr);
             }
 
             return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
new file mode 100644
index 0000000..0f0d62d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Arrays;
+
+/**
+ *
+ */
+public class CachePartitionFullCountersMap {
+    /** */
+    public static final CachePartitionFullCountersMap EMPTY = new CachePartitionFullCountersMap();
+
+    /** */
+    private long[] initialUpdCntrs;
+
+    /** */
+    private long[] updCntrs;
+
+    /**
+     *
+     */
+    public CachePartitionFullCountersMap() {
+        // Empty map.
+    }
+
+    /**
+     * @param other Map to copy.
+     */
+    public CachePartitionFullCountersMap(CachePartitionFullCountersMap other) {
+        initialUpdCntrs = Arrays.copyOf(other.initialUpdCntrs, other.initialUpdCntrs.length);
+        updCntrs = Arrays.copyOf(other.updCntrs, other.updCntrs.length);
+    }
+
+    /**
+     * @param partsCnt Total number of partitions.
+     */
+    public CachePartitionFullCountersMap(int partsCnt) {
+        initialUpdCntrs = new long[partsCnt];
+        updCntrs = new long[partsCnt];
+    }
+
+    /**
+     * Gets an initial update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @return Initial update counter for the partition with the given ID.
+     */
+    public long initialUpdateCounter(int p) {
+        return initialUpdCntrs[p];
+    }
+
+    /**
+     * Gets an update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @return Update counter for the partition with the given ID.
+     */
+    public long updateCounter(int p) {
+        return updCntrs[p];
+    }
+
+    /**
+     * Sets an initial update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @param initialUpdCntr Initial update counter to set.
+     */
+    public void initialUpdateCounter(int p, long initialUpdCntr) {
+        initialUpdCntrs[p] = initialUpdCntr;
+    }
+
+    /**
+     * Sets an update counter by the partition ID.
+     *
+     * @param p Partition ID.
+     * @param updCntr Update counter to set.
+     */
+    public void updateCounter(int p, long updCntr) {
+        updCntrs[p] = updCntr;
+    }
+
+    /**
+     * Clears full counters map.
+     */
+    public void clear() {
+        Arrays.fill(initialUpdCntrs, 0);
+        Arrays.fill(updCntrs, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
new file mode 100644
index 0000000..c6578d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class CachePartitionPartialCountersMap {
+    /** */
+    public static final CachePartitionPartialCountersMap EMPTY = new CachePartitionPartialCountersMap();
+
+    /** */
+    private int[] partIds;
+
+    /** */
+    private long[] initialUpdCntrs;
+
+    /** */
+    private long[] updCntrs;
+
+    /** */
+    private int curIdx;
+
+    /** */
+    private CachePartitionPartialCountersMap() {
+        // Empty map.
+    }
+
+    /**
+     * @param partsCnt Total number of partitions will be stored in the partial map.
+     */
+    public CachePartitionPartialCountersMap(int partsCnt) {
+        partIds = new int[partsCnt];
+        initialUpdCntrs = new long[partsCnt];
+        updCntrs = new long[partsCnt];
+    }
+
+    /**
+     * @return Total number of partitions added to the map.
+     */
+    public int size() {
+        return curIdx;
+    }
+
+    /**
+     * Adds partition counters for a partition with the given ID.
+     *
+     * @param partId Partition ID to add.
+     * @param initialUpdCntr Partition initial update counter.
+     * @param updCntr Partition update counter.
+     */
+    public void add(int partId, long initialUpdCntr, long updCntr) {
+        if (curIdx > 0) {
+            if (partIds[curIdx - 1] >= partId)
+                throw new IllegalArgumentException("Adding a partition in the wrong order " +
+                    "[prevPart=" + partIds[curIdx - 1] + ", partId=" + partId + ']');
+        }
+
+        if (curIdx == partIds.length)
+            throw new IllegalStateException("Adding more partitions than reserved: " + partIds.length);
+
+        partIds[curIdx] = partId;
+        initialUpdCntrs[curIdx] = initialUpdCntr;
+        updCntrs[curIdx] = updCntr;
+
+        curIdx++;
+    }
+
+    /**
+     * @param partId Partition ID to search.
+     * @return Partition index in the array.
+     */
+    public int partitionIndex(int partId) {
+        return Arrays.binarySearch(partIds, 0, curIdx, partId);
+    }
+
+    /**
+     * Gets partition ID saved at the given index.
+     *
+     * @param idx Index to get value from.
+     * @return Partition ID.
+     */
+    public int partitionAt(int idx) {
+        return partIds[idx];
+    }
+
+    /**
+     * Gets initial update counter saved at the given index.
+     *
+     * @param idx Index to get value from.
+     * @return Initial update counter.
+     */
+    public long initialUpdateCounterAt(int idx) {
+        return initialUpdCntrs[idx];
+    }
+
+    /**
+     * Gets update counter saved at the given index.
+     *
+     * @param idx Index to get value from.
+     * @return Update counter.
+     */
+    public long updateCounterAt(int idx) {
+        return updCntrs[idx];
+    }
+
+
+    /**
+     * @param cntrsMap Partial local counters map.
+     * @return Partition ID to partition counters map.
+     */
+    public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionPartialCountersMap cntrsMap) {
+        if (cntrsMap.size() == 0)
+            return Collections.emptyMap();
+
+        Map<Integer, T2<Long, Long>> res = U.newHashMap(cntrsMap.size());
+
+        for (int idx = 0; idx < cntrsMap.size(); idx++)
+            res.put(cntrsMap.partitionAt(idx),
+                new T2<>(cntrsMap.initialUpdateCounterAt(idx), cntrsMap.updateCounterAt(idx)));
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 441952d..cae3ce2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -19,11 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.Map;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -92,12 +90,6 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /**
-     * @param grpId Cache group ID.
-     * @return Parition update counters.
-     */
-    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
-
-    /**
      * @return Last used version among all nodes.
      */
     @Nullable public GridCacheVersion lastVersion() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 609021b..6789718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -624,7 +624,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (updateTop && clientTop != null) {
                     top.update(topologyVersion(),
                         clientTop.partitionMap(true),
-                        clientTop.updateCounters(false),
+                        clientTop.fullUpdateCounters(),
                         Collections.<Integer>emptySet(),
                         null);
                 }
@@ -1114,7 +1114,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         GridDhtPartitionsSingleMessage msg;
 
-        // Reset lost partition before send local partition to coordinator.
+        // Reset lost partitions before sending local partitions to coordinator.
         if (exchActions != null) {
             Set<String> caches = exchActions.cachesToResetLostPartitions();
 
@@ -1524,10 +1524,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         Map<Integer, Long> minCntrs = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
-            assert e.getValue().partitionUpdateCounters(top.groupId()) != null;
+            CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId());
 
-            for (Map.Entry<Integer, T2<Long, Long>> e0 : e.getValue().partitionUpdateCounters(top.groupId()).entrySet()) {
-                int p = e0.getKey();
+            assert nodeCntrs != null;
+
+            for (int i = 0; i < nodeCntrs.size(); i++) {
+                int p = nodeCntrs.partitionAt(i);
 
                 UUID uuid = e.getKey();
 
@@ -1536,10 +1538,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
                     continue;
 
-                Long cntr = state == GridDhtPartitionState.MOVING ? e0.getValue().get1() : e0.getValue().get2();
-
-                if (cntr == null)
-                    cntr = 0L;
+                long cntr = state == GridDhtPartitionState.MOVING ?
+                    nodeCntrs.initialUpdateCounterAt(i) :
+                    nodeCntrs.updateCounterAt(i);
 
                 Long minCntr = minCntrs.get(p);
 
@@ -1555,6 +1556,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     maxCntrs.put(p, new CounterWithNodes(cntr, uuid));
                 else if (cntr == maxCntr.cnt)
                     maxCntr.nodes.add(uuid);
+
             }
         }
 
@@ -1728,10 +1730,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         GridDhtPartitionTopology top = grp != null ? grp.topology() :
                             cctx.exchange().clientTopology(grpId, this);
 
-                        Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId);
+                        CachePartitionPartialCountersMap cntrs = msg0.partitionUpdateCounters(grpId);
 
                         if (cntrs != null)
-                            top.applyUpdateCounters(cntrs);
+                            top.collectUpdateCounters(cntrs);
                     }
                 }
             }
@@ -1965,7 +1967,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
             Integer grpId = entry.getKey();
 
-            Map<Integer, T2<Long, Long>> cntrMap = msg.partitionUpdateCounters(grpId);
+            CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId);
 
             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index acc4dbe..ef3a58f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -187,7 +186,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap) {
         if (partCntrs == null)
             partCntrs = new IgniteDhtPartitionCountersMap();
 
@@ -198,14 +197,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
-        if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
+    public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) {
+        CachePartitionFullCountersMap res = partCntrs == null ? null : partCntrs.get(grpId);
 
-            return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
-        }
-
-        return Collections.emptyMap();
+        return res != null ? res : CachePartitionFullCountersMap.EMPTY;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index b4d25c4..3c11fa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.util.Map;
-import java.util.HashMap;
+import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.io.Externalizable;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -60,7 +59,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Partitions update counters. */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<Integer, Map<Integer, T2<Long, Long>>> partCntrs;
+    private Map<Integer, CachePartitionPartialCountersMap> partCntrs;
 
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
@@ -150,7 +149,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public void partitionUpdateCounters(int grpId, CachePartitionPartialCountersMap cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
@@ -161,14 +160,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId) {
-        if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
-
-            return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
-        }
+    public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId) {
+        CachePartitionPartialCountersMap res = partCntrs == null ? null : partCntrs.get(grpId);
 
-        return Collections.emptyMap();
+        return res == null ? CachePartitionPartialCountersMap.EMPTY : res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
index 4b80ee0..5fb22a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
@@ -19,9 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -53,11 +50,6 @@ public class GridDhtPartitionsSingleRequest extends GridDhtPartitionsAbstractMes
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId) {
-        return Collections.emptyMap();
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index 9db80ae..0124e80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -19,10 +19,10 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.ignite.internal.util.typedef.T2;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap.EMPTY;
 
 /**
  * Partition counters map.
@@ -32,13 +32,13 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private Map<Integer, Map<Integer, T2<Long, Long>>> map;
+    private Map<Integer, CachePartitionFullCountersMap> map;
 
     /**
      * @param cacheId Cache ID.
      * @param cntrMap Counters map.
      */
-    public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>> cntrMap) {
+    public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap) {
         if (map == null)
             map = new HashMap<>();
 
@@ -50,14 +50,14 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
      * @param cacheId Cache ID.
      * @return Counters map.
      */
-    public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) {
+    public synchronized CachePartitionFullCountersMap get(int cacheId) {
         if (map == null)
             map = new HashMap<>();
 
-        Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId);
+        CachePartitionFullCountersMap cntrMap = map.get(cacheId);
 
         if (cntrMap == null)
-            return Collections.emptyMap();
+            return EMPTY;
 
         return cntrMap;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ed6eee2..83a9f55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1167,7 +1167,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public Long initialUpdateCounter() {
+        @Override public long initialUpdateCounter() {
             try {
                 CacheDataStore delegate0 = init0(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7062353..be958c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -93,6 +93,7 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CONTINUOUS;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.toCountersMap;
 import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_ACK;
 import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.MSG_EVT_NOTIFICATION;
 
@@ -202,7 +203,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                                 GridCacheContext cctx = interCache != null ? interCache.context() : null;
 
                                 if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
-                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters(false));
+                                    cntrsPerNode.put(ctx.localNodeId(),
+                                        toCountersMap(cctx.topology().localUpdateCounters()));
 
                                 routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
@@ -1070,7 +1072,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
-                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters(false));
+                    req.addUpdateCounters(ctx.localNodeId(),
+                        toCountersMap(cache.context().topology().localUpdateCounters()));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a5a4778/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 43069cd..39c31ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
@@ -519,11 +520,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
 
             Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME);
 
-            Map<Integer, T2<Long, Long>> act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology().updateCounters(false);
+            CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology().localUpdateCounters();
 
             for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
-                if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode()))
-                    assertEquals(e.getValue(), act.get(e.getKey()).get2());
+                if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) {
+                    int partIdx = act.partitionIndex(e.getKey());
+
+                    assertEquals(e.getValue(), (Long)act.updateCounterAt(partIdx));
+                }
             }
         }
     }


Mime
View raw message