ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [4/5] ignite git commit: IGNITE-4827: Remove compatibility logic for 1.x versions. This closes #1654.
Date Mon, 27 Mar 2017 12:18:36 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e585b56..d5f2246 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -27,12 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheRebalanceMode;
@@ -54,13 +50,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -71,12 +65,10 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.jetbrains.annotations.Nullable;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
@@ -109,18 +101,6 @@ public class GridDhtPartitionDemander {
     /** Last exchange future. */
     private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
 
-    /** Demand lock. */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private final ReadWriteLock demandLock;
-
-    /** DemandWorker index. */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private final AtomicInteger dmIdx = new AtomicInteger();
-
-    /** DemandWorker. */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private volatile DemandWorker worker;
-
     /** Cached rebalance topics. */
     private final Map<Integer, Object> rebalanceTopics;
 
@@ -138,13 +118,11 @@ public class GridDhtPartitionDemander {
 
     /**
      * @param cctx Cctx.
-     * @param demandLock Demand lock.
      */
-    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
+    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
         assert cctx != null;
 
         this.cctx = cctx;
-        this.demandLock = demandLock;
 
         log = cctx.logger(getClass());
 
@@ -184,11 +162,6 @@ public class GridDhtPartitionDemander {
             rebalanceFut.onDone(false);
         }
 
-        DemandWorker dw = worker;
-
-        if (dw != null)
-            dw.cancel();
-
         lastExchangeFut = null;
 
         lastTimeoutObj.set(null);
@@ -466,65 +439,47 @@ public class GridDhtPartitionDemander {
 
             GridDhtPartitionDemandMessage d = e.getValue();
 
-            //Check remote node rebalancing API version.
-            if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
-                U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
-                    ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
-                    ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
-
-                int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
+            U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
+                ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
+                ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
 
-                List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
+            int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
 
-                for (int cnt = 0; cnt < lsnrCnt; cnt++)
-                    sParts.add(new HashSet<Integer>());
+            List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
-                Iterator<Integer> it = parts.iterator();
+            for (int cnt = 0; cnt < lsnrCnt; cnt++)
+                sParts.add(new HashSet<Integer>());
 
-                int cnt = 0;
+            Iterator<Integer> it = parts.iterator();
 
-                while (it.hasNext())
-                    sParts.get(cnt++ % lsnrCnt).add(it.next());
+            int cnt = 0;
 
-                for (cnt = 0; cnt < lsnrCnt; cnt++) {
-                    if (!sParts.get(cnt).isEmpty()) {
-                        // Create copy.
-                        GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+            while (it.hasNext())
+                sParts.get(cnt++ % lsnrCnt).add(it.next());
 
-                        initD.topic(rebalanceTopics.get(cnt));
-                        initD.updateSequence(fut.updateSeq);
-                        initD.timeout(cctx.config().getRebalanceTimeout());
-
-                        synchronized (fut) {
-                            if (!fut.isDone()) {
-                                // Future can be already cancelled at this moment and all failovers happened.
-                                // New requests will not be covered by failovers.
-                                cctx.io().sendOrderedMessage(node,
-                                    rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
-                            }
+            for (cnt = 0; cnt < lsnrCnt; cnt++) {
+                if (!sParts.get(cnt).isEmpty()) {
+                    // Create copy.
+                    GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
+
+                    initD.topic(rebalanceTopics.get(cnt));
+                    initD.updateSequence(fut.updateSeq);
+                    initD.timeout(cctx.config().getRebalanceTimeout());
+
+                    synchronized (fut) {
+                        if (!fut.isDone()) {
+                            // Future can be already cancelled at this moment and all failovers happened.
+                            // New requests will not be covered by failovers.
+                            cctx.io().sendOrderedMessage(node,
+                                rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
                         }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
-                                cnt + ", partitions count=" + sParts.get(cnt).size() +
-                                " (" + partitionsList(sParts.get(cnt)) + ")]");
                     }
-                }
-            }
-            else {
-                U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
-                    ", mode=" + cfg.getRebalanceMode() +
-                    ", fromNode=" + node.id() +
-                    ", partitionsCount=" + parts.size() +
-                    ", topology=" + fut.topologyVersion() +
-                    ", updateSeq=" + fut.updateSeq + "]");
 
-                d.timeout(cctx.config().getRebalanceTimeout());
-                d.workerId(0);//old api support.
-
-                worker = new DemandWorker(dmIdx.incrementAndGet(), fut);
-
-                worker.run(node, d);
+                    if (log.isDebugEnabled())
+                        log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
+                            cnt + ", partitions count=" + sParts.get(cnt).size() +
+                            " (" + partitionsList(sParts.get(cnt)) + ")]");
+                }
             }
         }
     }
@@ -997,26 +952,23 @@ public class GridDhtPartitionDemander {
             if (node == null)
                 return;
 
-            //Check remote node rebalancing API version.
-            if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
-                GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-                    -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
+            GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
+                -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
 
-                d.timeout(cctx.config().getRebalanceTimeout());
+            d.timeout(cctx.config().getRebalanceTimeout());
 
-                try {
-                    for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
-                        d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
+            try {
+                for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+                    d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-                        cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
-                            d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
-                    }
-                }
-                catch (IgniteCheckedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send failover context cleanup request to node");
+                    cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                        d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
                 }
             }
+            catch (IgniteCheckedException ignored) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send failover context cleanup request to node");
+            }
         }
 
         /**
@@ -1147,373 +1099,4 @@ public class GridDhtPartitionDemander {
             return S.toString(RebalanceFuture.class, this);
         }
     }
-
-    /**
-     * Supply message wrapper.
-     */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private static class SupplyMessage {
-        /** Sender ID. */
-        private UUID sndId;
-
-        /** Supply message. */
-        private GridDhtPartitionSupplyMessage supply;
-
-        /**
-         * Dummy constructor.
-         */
-        private SupplyMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param sndId Sender ID.
-         * @param supply Supply message.
-         */
-        SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
-            this.sndId = sndId;
-            this.supply = supply;
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return sndId;
-        }
-
-        /**
-         * @return Message.
-         */
-        GridDhtPartitionSupplyMessage supply() {
-            return supply;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(SupplyMessage.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private class DemandWorker {
-        /** Worker ID. */
-        private int id;
-
-        /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
-
-        /** Message queue. */
-        private final LinkedBlockingDeque<SupplyMessage> msgQ =
-            new LinkedBlockingDeque<>();
-
-        /** Counter. */
-        private long cntr;
-
-        /** Hide worker logger and use cache logger instead. */
-        private IgniteLogger log = GridDhtPartitionDemander.this.log;
-
-        /** */
-        private volatile RebalanceFuture fut;
-
-        /**
-         * @param id Worker ID.
-         * @param fut Rebalance future.
-         */
-        private DemandWorker(int id, RebalanceFuture fut) {
-            assert id >= 0;
-
-            this.id = id;
-            this.fut = fut;
-        }
-
-        /**
-         * @param msg Message.
-         */
-        private void addMessage(SupplyMessage msg) {
-            msgQ.offer(msg);
-        }
-
-        /**
-         * @param deque Deque to poll from.
-         * @param time Time to wait.
-         * @return Polled item.
-         * @throws InterruptedException If interrupted.
-         */
-        @Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
-            return deque.poll(time, MILLISECONDS);
-        }
-
-        /**
-         * @param idx Unique index for this topic.
-         * @return Topic for partition.
-         */
-        public Object topic(long idx) {
-            return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
-        }
-
-        /** */
-        public void cancel() {
-            msgQ.clear();
-
-            msgQ.offer(new SupplyMessage(null, null));
-        }
-
-        /**
-         * @param node Node to demand from.
-         * @param topVer Topology version.
-         * @param d Demand message.
-         * @param exchFut Exchange future.
-         * @throws InterruptedException If interrupted.
-         * @throws ClusterTopologyCheckedException If node left.
-         * @throws IgniteCheckedException If failed to send message.
-         */
-        private void demandFromNode(
-            ClusterNode node,
-            final AffinityTopologyVersion topVer,
-            GridDhtPartitionDemandMessage d,
-            GridDhtPartitionsExchangeFuture exchFut
-        ) throws InterruptedException, IgniteCheckedException {
-            GridDhtPartitionTopology top = cctx.dht().topology();
-
-            cntr++;
-
-            d.topic(topic(cntr));
-            d.workerId(id);
-
-            if (fut.isDone() || topologyChanged(fut))
-                return;
-
-            cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
-                    addMessage(new SupplyMessage(nodeId, msg));
-                }
-            });
-
-            try {
-                boolean retry;
-
-                // DoWhile.
-                // =======
-                do {
-                    retry = false;
-
-                    // Create copy.
-                    d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
-
-                    long timeout = cctx.config().getRebalanceTimeout();
-
-                    d.timeout(timeout);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
-
-                    // Send demand message.
-                    cctx.io().send(node, d, cctx.ioPolicy());
-
-                    // While.
-                    // =====
-                    while (!fut.isDone() && !topologyChanged(fut)) {
-                        SupplyMessage s = poll(msgQ, timeout);
-
-                        // If timed out.
-                        if (s == null) {
-                            if (msgQ.isEmpty()) { // Safety check.
-                                U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
-                                    " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
-                                    " configuration properties).");
-
-                                // Ordered listener was removed if timeout expired.
-                                cctx.io().removeOrderedHandler(d.topic());
-
-                                // Must create copy to be able to work with IO manager thread local caches.
-                                d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
-
-                                // Create new topic.
-                                d.topic(topic(++cntr));
-
-                                // Create new ordered listener.
-                                cctx.io().addOrderedHandler(d.topic(),
-                                    new CI2<UUID, GridDhtPartitionSupplyMessage>() {
-                                        @Override public void apply(UUID nodeId,
-                                            GridDhtPartitionSupplyMessage msg) {
-                                            addMessage(new SupplyMessage(nodeId, msg));
-                                        }
-                                    });
-
-                                // Resend message with larger timeout.
-                                retry = true;
-
-                                break; // While.
-                            }
-                            else
-                                continue; // While.
-                        }
-
-                        if (s.senderId() == null)
-                            return; // Stopping now.
-
-                        // Check that message was received from expected node.
-                        if (!s.senderId().equals(node.id())) {
-                            U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
-                                ", rcvdId=" + s.senderId() + ", msg=" + s + ']');
-
-                            continue; // While.
-                        }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Received supply message: " + s);
-
-                        GridDhtPartitionSupplyMessage supply = s.supply();
-
-                        // Check whether there were class loading errors on unmarshal
-                        if (supply.classError() != null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Class got undeployed during preloading: " + supply.classError());
-
-                            retry = true;
-
-                            // Quit preloading.
-                            break;
-                        }
-
-                        // Preload.
-                        for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
-                            int p = e.getKey();
-
-                            if (cctx.affinity().partitionLocalNode(p, topVer)) {
-                                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
-
-                                assert part != null;
-
-                                if (part.state() == MOVING) {
-                                    boolean reserved = part.reserve();
-
-                                    assert reserved : "Failed to reserve partition [igniteInstanceName=" +
-                                        cctx.igniteInstanceName() + ", cacheName=" + cctx.namex() +
-                                        ", part=" + part + ']';
-
-                                    part.lock();
-
-                                    try {
-                                        Collection<Integer> invalidParts = new GridLeanSet<>();
-
-                                        // Loop through all received entries and try to preload them.
-                                        for (GridCacheEntryInfo entry : e.getValue().infos()) {
-                                            if (!invalidParts.contains(p)) {
-                                                if (!part.preloadingPermitted(entry.key(), entry.version())) {
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Preloading is not permitted for entry due to " +
-                                                            "evictions [key=" + entry.key() +
-                                                            ", ver=" + entry.version() + ']');
-
-                                                    continue;
-                                                }
-
-                                                if (!preloadEntry(node, p, entry, topVer)) {
-                                                    invalidParts.add(p);
-
-                                                    if (log.isDebugEnabled())
-                                                        log.debug("Got entries for invalid partition during " +
-                                                            "preloading (will skip) [p=" + p + ", entry=" + entry + ']');
-                                                }
-                                            }
-                                        }
-
-                                        boolean last = supply.last().contains(p);
-
-                                        // If message was last for this partition,
-                                        // then we take ownership.
-                                        if (last) {
-                                            fut.partitionDone(node.id(), p);
-
-                                            top.own(part);
-
-                                            if (log.isDebugEnabled())
-                                                log.debug("Finished rebalancing partition: " + part);
-
-                                            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                                                preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                                                    exchFut.discoveryEvent());
-                                        }
-                                    }
-                                    finally {
-                                        part.unlock();
-                                        part.release();
-                                    }
-                                }
-                                else {
-                                    fut.partitionDone(node.id(), p);
-
-                                    if (log.isDebugEnabled())
-                                        log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
-                                }
-                            }
-                            else {
-                                fut.partitionDone(node.id(), p);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
-                            }
-                        }
-
-                        // Only request partitions based on latest topology version.
-                        for (Integer miss : s.supply().missed()) {
-                            if (cctx.affinity().partitionLocalNode(miss, topVer))
-                                fut.partitionMissed(node.id(), miss);
-                        }
-
-                        for (Integer miss : s.supply().missed())
-                            fut.partitionDone(node.id(), miss);
-
-                        if (fut.remaining.get(node.id()) == null)
-                            break; // While.
-
-                        if (s.supply().ack()) {
-                            retry = true;
-
-                            break;
-                        }
-                    }
-                }
-                while (retry && !fut.isDone() && !topologyChanged(fut));
-            }
-            finally {
-                cctx.io().removeOrderedHandler(d.topic());
-            }
-        }
-
-        /**
-         * @param node Node.
-         * @param d D.
-         * @throws IgniteCheckedException If failed.
-         */
-        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException {
-            demandLock.readLock().lock();
-
-            try {
-                GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
-
-                AffinityTopologyVersion topVer = fut.topVer;
-
-                try {
-                    demandFromNode(node, topVer, d, exchFut);
-                }
-                catch (InterruptedException e) {
-                    throw new IgniteCheckedException(e);
-                }
-            }
-            finally {
-                demandLock.readLock().unlock();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index e8860f2..27e6777 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 /**
  * Full partition map.
  */
-public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
+public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap>
     implements Comparable<GridDhtPartitionFullMap>, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -65,32 +65,9 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
      * @param nodeOrder Node order.
      * @param updateSeq Update sequence number.
      * @param m Map to copy.
-     */
-    @Deprecated // Backward compatibility.
-    public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap2> m) {
-        assert nodeId != null;
-        assert updateSeq > 0;
-        assert nodeOrder > 0;
-
-        this.nodeId = nodeId;
-        this.nodeOrder = nodeOrder;
-        this.updateSeq = updateSeq;
-
-        for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) {
-            GridDhtPartitionMap2 part = e.getValue();
-
-            put(e.getKey(), new GridDhtPartitionMap(part.nodeId(), part.updateSequence(), part.map()));
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param nodeOrder Node order.
-     * @param updateSeq Update sequence number.
-     * @param m Map to copy.
      * @param onlyActive If {@code true}, then only active partitions will be included.
      */
-    public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap2> m,
+    public GridDhtPartitionFullMap(UUID nodeId, long nodeOrder, long updateSeq, Map<UUID, GridDhtPartitionMap> m,
         boolean onlyActive) {
         assert nodeId != null;
         assert updateSeq > 0;
@@ -100,10 +77,10 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
         this.nodeOrder = nodeOrder;
         this.updateSeq = updateSeq;
 
-        for (Map.Entry<UUID, GridDhtPartitionMap2> e : m.entrySet()) {
-            GridDhtPartitionMap2 part = e.getValue();
+        for (Map.Entry<UUID, GridDhtPartitionMap> e : m.entrySet()) {
+            GridDhtPartitionMap part = e.getValue();
 
-            GridDhtPartitionMap2 cpy = new GridDhtPartitionMap2(part.nodeId(),
+            GridDhtPartitionMap cpy = new GridDhtPartitionMap(part.nodeId(),
                 part.updateSequence(),
                 part.topologyVersion(),
                 part.map(),
@@ -168,8 +145,8 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
         if (size() != fullMap.size())
             return false;
 
-        for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) {
-            GridDhtPartitionMap2 m = fullMap.get(e.getKey());
+        for (Map.Entry<UUID, GridDhtPartitionMap> e : entrySet()) {
+            GridDhtPartitionMap m = fullMap.get(e.getKey());
 
             if (m == null || !m.map().equals(e.getValue().map()))
                 return false;
@@ -238,7 +215,7 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
         nodeOrder = in.readLong();
         updateSeq = in.readLong();
 
-        putAll(U.<UUID, GridDhtPartitionMap2>readMap(in));
+        putAll(U.<UUID, GridDhtPartitionMap>readMap(in));
     }
 
     /** {@inheritDoc} */
@@ -260,7 +237,7 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
      * @return Map string representation.
      */
     public String map2string() {
-        Iterator<Map.Entry<UUID, GridDhtPartitionMap2>> it = entrySet().iterator();
+        Iterator<Map.Entry<UUID, GridDhtPartitionMap>> it = entrySet().iterator();
 
         if (!it.hasNext())
             return "{}";
@@ -270,11 +247,11 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
         buf.append('{');
 
         while(true) {
-            Map.Entry<UUID, GridDhtPartitionMap2> e = it.next();
+            Map.Entry<UUID, GridDhtPartitionMap> e = it.next();
 
             UUID nodeId = e.getKey();
 
-            GridDhtPartitionMap2 partMap = e.getValue();
+            GridDhtPartitionMap partMap = e.getValue();
 
             buf.append(nodeId).append('=').append(partMap.toFullString());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index 3096d63..43087ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -22,46 +22,215 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
+
 /**
  * Partition map.
  */
-@Deprecated // Backward compatibility, use GridDhtPartitionMap2 instead.
-public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
+public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Node ID. */
+    protected UUID nodeId;
+
+    /** Update sequence number. */
+    protected long updateSeq;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion top;
+
+    /** */
+    protected Map<Integer, GridDhtPartitionState> map;
+
+    /** */
+    private volatile int moving;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtPartitionMap() {
+        // No-op.
+    }
+
     /**
      * @param nodeId Node ID.
      * @param updateSeq Update sequence number.
+     * @param top Topology version.
      * @param m Map to copy.
+     * @param onlyActive If {@code true}, then only active states will be included.
      */
-    public GridDhtPartitionMap(UUID nodeId, long updateSeq,
-        Map<Integer, GridDhtPartitionState> m) {
+    public GridDhtPartitionMap(UUID nodeId,
+        long updateSeq,
+        AffinityTopologyVersion top,
+        Map<Integer, GridDhtPartitionState> m,
+        boolean onlyActive) {
         assert nodeId != null;
         assert updateSeq > 0;
 
         this.nodeId = nodeId;
         this.updateSeq = updateSeq;
+        this.top = top;
 
         map = U.newHashMap(m.size());
 
         for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) {
             GridDhtPartitionState state = e.getValue();
 
-            put(e.getKey(), state);
+            if (!onlyActive || state.active())
+                put(e.getKey(), state);
         }
     }
 
     /**
-     * Empty constructor required for {@link Externalizable}.
+     * @param nodeId Node ID.
+     * @param updateSeq Update sequence number.
+     * @param top Topology version.
+     * @param map Map.
+     * @param moving Number of moving partitions.
      */
-    public GridDhtPartitionMap() {
-        // No-op.
+    private GridDhtPartitionMap(UUID nodeId,
+        long updateSeq,
+        AffinityTopologyVersion top,
+        Map<Integer, GridDhtPartitionState> map,
+        int moving) {
+        this.nodeId = nodeId;
+        this.updateSeq = updateSeq;
+        this.top = top;
+        this.map = map;
+        this.moving = moving;
+    }
+
+    /**
+     * @return Copy with empty partition state map.
+     */
+    public GridDhtPartitionMap emptyCopy() {
+        return new GridDhtPartitionMap(nodeId,
+            updateSeq,
+            top,
+            U.<Integer, GridDhtPartitionState>newHashMap(0),
+            0);
+    }
+
+    /**
+     * @param part Partition.
+     * @param state Partition state.
+     */
+    public void put(Integer part, GridDhtPartitionState state) {
+        GridDhtPartitionState old = map.put(part, state);
+
+        if (old == MOVING)
+            moving--;
+
+        if (state == MOVING)
+            moving++;
+    }
+
+    /**
+     * @return {@code true} If partition map contains moving partitions.
+     */
+    public boolean hasMovingPartitions() {
+        assert moving >= 0 : moving;
+
+        return moving != 0;
+    }
+
+    /**
+     * @param part Partition.
+     * @return Partition state.
+     */
+    public GridDhtPartitionState get(Integer part) {
+        return map.get(part);
+    }
+
+    /**
+     * @param part Partition.
+     * @return {@code True} if contains given partition.
+     */
+    public boolean containsKey(Integer part) {
+        return map.containsKey(part);
+    }
+
+    /**
+     * @return Entries.
+     */
+    public Set<Map.Entry<Integer, GridDhtPartitionState>> entrySet() {
+        return map.entrySet();
+    }
+
+    /**
+     * @return Map size.
+     */
+    public int size() {
+        return map.size();
+    }
+
+    /**
+     * @return Partitions.
+     */
+    public Set<Integer> keySet() {
+        return map.keySet();
+    }
+
+    /**
+     * @return Underlying map.
+     */
+    public Map<Integer, GridDhtPartitionState> map() {
+        return map;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Update sequence.
+     */
+    public long updateSequence() {
+        return updateSeq;
+    }
+
+    /**
+     * @param updateSeq New update sequence value.
+     * @param topVer Current topology version.
+     * @return Old update sequence value.
+     */
+    public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) {
+        long old = this.updateSeq;
+
+        assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
+
+        this.updateSeq = updateSeq;
+
+        top = topVer;
+
+        return old;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return top;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(GridDhtPartitionMap o) {
+        assert nodeId.equals(o.nodeId);
+
+        return Long.compare(updateSeq, o.updateSeq);
     }
 
     /** {@inheritDoc} */
@@ -80,7 +249,7 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
             int ordinal = entry.getValue().ordinal();
 
             assert ordinal == (ordinal & 0x3);
-            assert entry.getKey() == (entry.getKey() & 0x3FFF);
+            assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey();
 
             int coded = (ordinal << 14) | entry.getKey();
 
@@ -90,6 +259,15 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
         }
 
         assert i == size;
+
+        if (top != null) {
+            out.writeLong(topologyVersion().topologyVersion());
+            out.writeInt(topologyVersion().minorTopologyVersion());
+        }
+        else {
+            out.writeLong(0);
+            out.writeInt(0);
+        }
     }
 
     /** {@inheritDoc} */
@@ -110,6 +288,12 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
 
             put(part, GridDhtPartitionState.fromOrdinal(ordinal));
         }
+
+        long ver = in.readLong();
+        int minorVer = in.readInt();
+
+        if (ver != 0)
+            top = new AffinityTopologyVersion(ver, minorVer);
     }
 
     /** {@inheritDoc} */
@@ -117,7 +301,7 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
         if (this == o)
             return true;
 
-        GridDhtPartitionMap2 other = (GridDhtPartitionMap2)o;
+        GridDhtPartitionMap other = (GridDhtPartitionMap)o;
 
         return other.nodeId.equals(nodeId) && other.updateSeq == updateSeq;
     }
@@ -131,11 +315,11 @@ public class GridDhtPartitionMap extends GridDhtPartitionMap2 {
      * @return Full string representation.
      */
     public String toFullString() {
-        return S.toString(GridDhtPartitionMap2.class, this, "size", size(), "map", map.toString());
+        return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString(), "top", top);
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtPartitionMap2.class, this, "size", size());
+        return S.toString(GridDhtPartitionMap.class, this, "size", size());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
deleted file mode 100644
index 7d6f272..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
-
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
-
-/**
- * Partition map.
- */
-public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Used since. */
-    public static final IgniteProductVersion SINCE = IgniteProductVersion.fromString("1.5.0");
-
-    /** Node ID. */
-    protected UUID nodeId;
-
-    /** Update sequence number. */
-    protected long updateSeq;
-
-    /** Topology version. */
-    protected AffinityTopologyVersion top;
-
-    /** */
-    protected Map<Integer, GridDhtPartitionState> map;
-
-    /** */
-    private volatile int moving;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtPartitionMap2() {
-        // No-op.
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param updateSeq Update sequence number.
-     * @param top Topology version.
-     * @param m Map to copy.
-     * @param onlyActive If {@code true}, then only active states will be included.
-     */
-    public GridDhtPartitionMap2(UUID nodeId,
-        long updateSeq,
-        AffinityTopologyVersion top,
-        Map<Integer, GridDhtPartitionState> m,
-        boolean onlyActive) {
-        assert nodeId != null;
-        assert updateSeq > 0;
-
-        this.nodeId = nodeId;
-        this.updateSeq = updateSeq;
-        this.top = top;
-
-        map = U.newHashMap(m.size());
-
-        for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) {
-            GridDhtPartitionState state = e.getValue();
-
-            if (!onlyActive || state.active())
-                put(e.getKey(), state);
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param updateSeq Update sequence number.
-     * @param top Topology version.
-     * @param map Map.
-     * @param moving Number of moving partitions.
-     */
-    private GridDhtPartitionMap2(UUID nodeId,
-        long updateSeq,
-        AffinityTopologyVersion top,
-        Map<Integer, GridDhtPartitionState> map,
-        int moving) {
-        this.nodeId = nodeId;
-        this.updateSeq = updateSeq;
-        this.top = top;
-        this.map = map;
-        this.moving = moving;
-    }
-
-    /**
-     * @return Copy with empty partition state map.
-     */
-    public GridDhtPartitionMap2 emptyCopy() {
-        return new GridDhtPartitionMap2(nodeId,
-            updateSeq,
-            top,
-            U.<Integer, GridDhtPartitionState>newHashMap(0),
-            0);
-    }
-
-    /**
-     * @param part Partition.
-     * @param state Partition state.
-     */
-    public void put(Integer part, GridDhtPartitionState state) {
-        GridDhtPartitionState old = map.put(part, state);
-
-        if (old == MOVING)
-            moving--;
-
-        if (state == MOVING)
-            moving++;
-    }
-
-    /**
-     * @return {@code true} If partition map contains moving partitions.
-     */
-    public boolean hasMovingPartitions() {
-        assert moving >= 0 : moving;
-
-        return moving != 0;
-    }
-
-    /**
-     * @param part Partition.
-     * @return Partition state.
-     */
-    public GridDhtPartitionState get(Integer part) {
-        return map.get(part);
-    }
-
-    /**
-     * @param part Partition.
-     * @return {@code True} if contains given partition.
-     */
-    public boolean containsKey(Integer part) {
-        return map.containsKey(part);
-    }
-
-    /**
-     * @return Entries.
-     */
-    public Set<Map.Entry<Integer, GridDhtPartitionState>> entrySet() {
-        return map.entrySet();
-    }
-
-    /**
-     * @return Map size.
-     */
-    public int size() {
-        return map.size();
-    }
-
-    /**
-     * @return Partitions.
-     */
-    public Set<Integer> keySet() {
-        return map.keySet();
-    }
-
-    /**
-     * @return Underlying map.
-     */
-    public Map<Integer, GridDhtPartitionState> map() {
-        return map;
-    }
-
-    /**
-     * @return Node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @return Update sequence.
-     */
-    public long updateSequence() {
-        return updateSeq;
-    }
-
-    /**
-     * @param updateSeq New update sequence value.
-     * @param topVer Current topology version.
-     * @return Old update sequence value.
-     */
-    public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) {
-        long old = this.updateSeq;
-
-        assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']';
-
-        this.updateSeq = updateSeq;
-
-        top = topVer;
-
-        return old;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    public AffinityTopologyVersion topologyVersion() {
-        return top;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(GridDhtPartitionMap2 o) {
-        assert nodeId.equals(o.nodeId);
-
-        return Long.compare(updateSeq, o.updateSeq);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeUuid(out, nodeId);
-
-        out.writeLong(updateSeq);
-
-        int size = map.size();
-
-        out.writeInt(size);
-
-        int i = 0;
-
-        for (Map.Entry<Integer, GridDhtPartitionState> entry : map.entrySet()) {
-            int ordinal = entry.getValue().ordinal();
-
-            assert ordinal == (ordinal & 0x3);
-            assert entry.getKey() < CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey();
-
-            int coded = (ordinal << 14) | entry.getKey();
-
-            out.writeShort((short)coded);
-
-            i++;
-        }
-
-        assert i == size;
-
-        if (top != null) {
-            out.writeLong(topologyVersion().topologyVersion());
-            out.writeInt(topologyVersion().minorTopologyVersion());
-        }
-        else {
-            out.writeLong(0);
-            out.writeInt(0);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        nodeId = U.readUuid(in);
-
-        updateSeq = in.readLong();
-
-        int size = in.readInt();
-
-        map = U.newHashMap(size);
-
-        for (int i = 0; i < size; i++) {
-            int entry = in.readShort() & 0xFFFF;
-
-            int part = entry & 0x3FFF;
-            int ordinal = entry >> 14;
-
-            put(part, GridDhtPartitionState.fromOrdinal(ordinal));
-        }
-
-        long ver = in.readLong();
-        int minorVer = in.readInt();
-
-        if (ver != 0)
-            top = new AffinityTopologyVersion(ver, minorVer);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        GridDhtPartitionMap2 other = (GridDhtPartitionMap2)o;
-
-        return other.nodeId.equals(nodeId) && other.updateSeq == updateSeq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return 31 * nodeId.hashCode() + (int)(updateSeq ^ (updateSeq >>> 32));
-    }
-
-    /**
-     * @return Full string representation.
-     */
-    public String toFullString() {
-        return S.toString(GridDhtPartitionMap2.class, this, "size", size(), "map", map.toString(), "top", top);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtPartitionMap2.class, this, "size", size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 6e69161..1f3dee7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -32,9 +31,6 @@ import org.jetbrains.annotations.Nullable;
  */
 public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
     /** */
-    public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11");
-
-    /** */
     protected static final byte COMPRESSED_FLAG_MASK = 1;
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 5eacc36..f41da2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1476,7 +1476,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param msg Partitions single message.
      */
     private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) {
-        for (Map.Entry<Integer, GridDhtPartitionMap2> entry : msg.partitions().entrySet()) {
+        for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8a7adfc..33c23e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -241,13 +241,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                     assert map2 != null : e.getValue();
                     assert map1.size() == map2.size();
 
-                    for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map2.entrySet()) {
-                        GridDhtPartitionMap2 partMap1 = map1.get(e0.getKey());
+                    for (Map.Entry<UUID, GridDhtPartitionMap> e0 : map2.entrySet()) {
+                        GridDhtPartitionMap partMap1 = map1.get(e0.getKey());
 
                         assert partMap1 != null && partMap1.map().isEmpty() : partMap1;
                         assert !partMap1.hasMovingPartitions() : partMap1;
 
-                        GridDhtPartitionMap2 partMap2 = e0.getValue();
+                        GridDhtPartitionMap partMap2 = e0.getValue();
 
                         assert partMap2 != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index d65e405..da7403e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -47,7 +47,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Local partitions. */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<Integer, GridDhtPartitionMap2> parts;
+    private Map<Integer, GridDhtPartitionMap> parts;
 
     /** */
     @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
@@ -106,7 +106,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param locMap Local partition map.
      * @param dupDataCache Optional ID of cache with the same partition state map.
      */
-    public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, @Nullable Integer dupDataCache) {
+    public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap locMap, @Nullable Integer dupDataCache) {
         if (parts == null)
             parts = new HashMap<>();
 
@@ -152,7 +152,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /**
      * @return Local partitions.
      */
-    public Map<Integer, GridDhtPartitionMap2> partitions() {
+    public Map<Integer, GridDhtPartitionMap> partitions() {
         return parts;
     }
 
@@ -217,13 +217,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             assert parts != null;
 
             for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
-                GridDhtPartitionMap2 map1 = parts.get(e.getKey());
+                GridDhtPartitionMap map1 = parts.get(e.getKey());
 
                 assert map1 != null : e.getKey();
                 assert F.isEmpty(map1.map());
                 assert !map1.hasMovingPartitions();
 
-                GridDhtPartitionMap2 map2 = parts.get(e.getValue());
+                GridDhtPartitionMap map2 = parts.get(e.getValue());
 
                 assert map2 != null : e.getValue();
                 assert map2.map() != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 1d88742..dc988bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -62,7 +61,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -80,13 +78,6 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
  * DHT cache preloader.
  */
 public class GridDhtPreloader extends GridCachePreloaderAdapter {
-    /**
-     * Rebalancing was refactored at version 1.5.0, but backward compatibility to previous implementation was saved.
-     * Node automatically chose communication protocol depends on remote node's version.
-     * Backward compatibility may be removed at Ignite 2.x.
-     */
-    public static final IgniteProductVersion REBALANCING_VER_2_SINCE = IgniteProductVersion.fromString("1.5.0");
-
     /** Default preload resend timeout. */
     public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
 
@@ -194,7 +185,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         cctx.shared().affinity().onCacheCreated(cctx);
 
         supplier = new GridDhtPartitionSupplier(cctx);
-        demander = new GridDhtPartitionDemander(cctx, demandLock);
+        demander = new GridDhtPartitionDemander(cctx);
 
         supplier.start();
         demander.start();
@@ -619,14 +610,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                 AffinityAssignment assignment = cctx.affinity().assignment(topVer);
 
-                boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
-
                 GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
                     topVer,
-                    assignment.assignment(),
-                    newAffMode);
+                    assignment.assignment());
 
-                if (newAffMode && cctx.affinity().affinityCache().centralizedAffinityFunction()) {
+                if (cctx.affinity().affinityCache().centralizedAffinityFunction()) {
                     assert assignment.idealAssignment() != null;
 
                     res.idealAffinityAssignment(assignment.idealAssignment());

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
index 4dd7978..76147ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -34,9 +33,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  */
 public class CacheContinuousQueryBatchAck extends GridCacheMessage {
     /** */
-    public static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.0");
-
-    /** */
     private static final long serialVersionUID = 0L;
 
     /** Routine ID. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 485059f..6c8df14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1324,7 +1324,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                     for (AffinityTopologyVersion topVer : t.get2()) {
                         for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) {
-                            if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER) >= 0) {
+                            if (!node.isLocal()) {
                                 try {
                                     cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6887a50..745bbde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -58,14 +58,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
-import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
-import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
@@ -78,7 +77,6 @@ import static javax.cache.event.EventType.REMOVED;
 import static javax.cache.event.EventType.UPDATED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
-import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.QUERY_MSG_VER_2_SINCE;
 
 /**
  * Continuous queries manager.
@@ -424,49 +422,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         final boolean keepBinary,
         final boolean includeExpired) throws IgniteCheckedException
     {
-        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr;
+        IgniteOutClosure<CacheContinuousQueryHandler> clsr;
 
         if (rmtFilterFactory != null)
-            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
-                @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+            clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply() {
                     CacheContinuousQueryHandler hnd;
 
-                    if (v2)
-                        hnd = new CacheContinuousQueryHandlerV2(
-                            cctx.name(),
-                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-                            locLsnr,
-                            rmtFilterFactory,
-                            true,
-                            false,
-                            !includeExpired,
-                            false,
-                            null);
-                    else {
-                        CacheEntryEventFilter fltr = rmtFilterFactory.create();
-
-                        if (!(fltr instanceof CacheEntryEventSerializableFilter))
-                            throw new IgniteException("Topology has nodes of the old versions. In this case " +
-                                "EntryEventFilter should implement " +
-                                "org.apache.ignite.cache.CacheEntryEventSerializableFilter interface. Filter: " + fltr);
-
-                        hnd = new CacheContinuousQueryHandler(
-                            cctx.name(),
-                            TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
-                            locLsnr,
-                            (CacheEntryEventSerializableFilter)fltr,
-                            true,
-                            false,
-                            !includeExpired,
-                            false);
-                    }
+                    hnd = new CacheContinuousQueryHandlerV2(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locLsnr,
+                        rmtFilterFactory,
+                        true,
+                        false,
+                        !includeExpired,
+                        false,
+                        null);
 
                     return hnd;
                 }
             };
         else
-            clsr = new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
-                @Override public CacheContinuousQueryHandler apply(Boolean ignore) {
+            clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply() {
                     return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -509,8 +488,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     {
         return executeQuery0(
             locLsnr,
-            new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
-                @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+            new IgniteOutClosure<CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply() {
                     return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -603,18 +582,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
     /**
      * @param locLsnr Local listener.
+     * @param clsr Closure to create CacheContinuousQueryHandler.
      * @param bufSize Buffer size.
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
      * @param internal Internal flag.
      * @param notifyExisting Notify existing flag.
      * @param loc Local flag.
+     * @param keepBinary Keep binary flag.
      * @param onStart Waiting topology exchange.
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
     private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
-        IgniteClosure<Boolean, CacheContinuousQueryHandler> clsr,
+        IgniteOutClosure<CacheContinuousQueryHandler> clsr,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
@@ -631,9 +612,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode();
 
-        boolean v2 = useV2Protocol(cctx.discovery().allNodes());
-
-        final CacheContinuousQueryHandler hnd = clsr.apply(v2);
+        final CacheContinuousQueryHandler hnd = clsr.apply();
 
         hnd.taskNameHash(taskNameHash);
         hnd.skipPrimaryCheck(skipPrimaryCheck);
@@ -799,20 +778,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * @param nodes Nodes.
-     * @return {@code True} if all nodes greater than {@link GridContinuousProcessor#QUERY_MSG_VER_2_SINCE},
-     *     otherwise {@code false}.
-     */
-    private boolean useV2Protocol(Collection<ClusterNode> nodes) {
-        for (ClusterNode node : nodes) {
-            if (QUERY_MSG_VER_2_SINCE.compareTo(node.version()) > 0)
-                return false;
-        }
-
-        return true;
-    }
-
-    /**
      * @param lsnrId Listener ID.
      * @param lsnr Listener.
      * @param internal Internal flag.
@@ -922,14 +887,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
             routineId = executeQuery0(
                 locLsnr,
-                new IgniteClosure<Boolean, CacheContinuousQueryHandler>() {
-                    @Override public CacheContinuousQueryHandler apply(Boolean v2) {
+                new IgniteOutClosure<CacheContinuousQueryHandler>() {
+                    @Override public CacheContinuousQueryHandler apply() {
                         CacheContinuousQueryHandler hnd;
                         Factory<CacheEntryEventFilter> rmtFilterFactory = cfg.getCacheEntryEventFilterFactory();
 
-                        v2 = rmtFilterFactory != null && v2;
-
-                        if (v2)
+                        if (rmtFilterFactory != null)
                             hnd = new CacheContinuousQueryHandlerV2(
                                 cctx.name(),
                                 TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),

http://git-wip-us.apache.org/repos/asf/ignite/blob/488b25e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index ca4edb6..3814731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -37,7 +37,6 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreSession;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -67,7 +66,6 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.transactions.Transaction;
@@ -87,13 +85,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     /** */
     private static final int SES_ATTR = GridMetadataAwareAdapter.EntryKey.CACHE_STORE_MANAGER_KEY.key();
 
-    /**
-     * Behavior can be changed by setting {@link IgniteSystemProperties#IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY} property
-     * to {@code True}.
-     */
-    private static final IgniteProductVersion LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE =
-        IgniteProductVersion.fromString("1.5.22");
-
     /** */
     protected CacheStore<Object, Object> store;
 
@@ -237,22 +228,6 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             globalSesLsnrs = true;
         }
-
-        if (isLocal()) {
-            for (ClusterNode node : cctx.kernalContext().cluster().get().forRemotes().nodes()) {
-                if (LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE.compareTo(node.version()) > 0 &&
-                    !IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY)) {
-                    IgniteProductVersion v = LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE;
-
-                    log.warning("Since Ignite " + v.major() + "." + v.minor() + "." + v.maintenance() +
-                        " Local Store keeps primary and backup partitions. " +
-                        "To keep primary partitions only please set system property " +
-                        IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY + " to 'true'.");
-
-                    break;
-                }
-            }
-        }
     }
 
     /** {@inheritDoc} */


Mime
View raw message