ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject ignite git commit: finalizing changes
Date Thu, 26 Jan 2017 14:44:03 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-balance-master abb95fac5 -> 1b88631d1


finalizing changes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b88631d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b88631d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b88631d

Branch: refs/heads/ignite-comm-balance-master
Commit: 1b88631d1288d3969f4e6b821d3ff58f41a835ef
Parents: abb95fa
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Thu Jan 26 17:43:52 2017 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Thu Jan 26 17:43:52 2017 +0300

----------------------------------------------------------------------
 .../client/util/GridClientConsistentHash.java   |  14 +-
 .../ignite/internal/client/util/MpscQueue.java  | 289 -------------------
 .../managers/communication/GridIoManager.java   |   7 +
 .../discovery/GridDiscoveryManager.java         |  45 ++-
 .../affinity/GridAffinityAssignmentCache.java   |   8 +-
 .../cache/CacheAffinitySharedManager.java       |   2 +-
 .../processors/cache/GridCacheAdapter.java      |  18 --
 .../processors/cache/GridCacheIoManager.java    |   7 +
 .../processors/cache/IgniteCacheProxy.java      |   8 -
 .../dht/GridClientPartitionTopology.java        |   1 -
 .../dht/GridDhtAssignmentFetchFuture.java       |   7 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   3 -
 .../dht/atomic/GridDhtAtomicCache.java          |  15 +-
 .../GridNearAtomicSingleUpdateFuture.java       |  77 +++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  78 +++--
 .../cache/transactions/IgniteTxHandler.java     |   4 +-
 .../clock/GridClockSyncProcessor.java           |  52 ++--
 .../ignite/internal/util/StripedExecutor.java   |  42 +--
 .../discovery/GridDiscoveryManagerSelfTest.java | 116 --------
 19 files changed, 151 insertions(+), 642 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
index 8134906..0c9a3fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/util/GridClientConsistentHash.java
@@ -439,13 +439,9 @@ public class GridClientConsistentHash<N> {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        StringBuilder sb = new StringBuilder(getClass().getSimpleName());
-
-        sb.append(" [affSeed=").append(affSeed).
-            append(", circle=").append(circle).
-            append(", nodesComp=").append(nodesComp).
-            append(", nodes=").append(nodes).append("]");
-
-        return sb.toString();
+        return getClass().getSimpleName() + " [affSeed=" + affSeed +
+            ", circle=" + circle +
+            ", nodesComp=" + nodesComp +
+            ", nodes=" + nodes + "]";
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
deleted file mode 100644
index 8821f66..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/util/MpscQueue.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.client.util;
-
-import java.util.AbstractQueue;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.util.typedef.internal.A;
-
-import static java.util.concurrent.locks.LockSupport.park;
-import static java.util.concurrent.locks.LockSupport.unpark;
-
-/**
- * Multi producer single consumer queue.
- */
-public class MpscQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
-    static final int INITIAL_ARRAY_SIZE = 512;
-    static final Node BLOCKED = new Node();
-
-    final AtomicReference<Node> putStack = new AtomicReference<Node>();
-    private final AtomicInteger takeStackSize = new AtomicInteger();
-
-    private Thread consumerThread;
-    private Object[] takeStack = new Object[INITIAL_ARRAY_SIZE];
-    private int takeStackIndex = -1;
-
-    static int nextPowerOfTwo(final int value) {
-        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
-    }
-
-    public void setConsumerThread(Thread consumerThread) {
-        this.consumerThread = consumerThread;
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * This call is threadsafe; but it will only remove the items that are on the put-stack.
-     */
-    @Override
-    public void clear() {
-        putStack.set(BLOCKED);
-    }
-
-    @Override
-    public boolean offer(E item) {
-        A.notNull(item, "item can't be null");
-
-        AtomicReference<Node> putStack = this.putStack;
-        Node newHead = new Node();
-        newHead.item = item;
-
-        for (; ; ) {
-            Node oldHead = putStack.get();
-            if (oldHead == null || oldHead == BLOCKED) {
-                newHead.next = null;
-                newHead.size = 1;
-            } else {
-                newHead.next = oldHead;
-                newHead.size = oldHead.size + 1;
-            }
-
-            if (!putStack.compareAndSet(oldHead, newHead)) {
-                continue;
-            }
-
-            if (oldHead == BLOCKED) {
-                unpark(consumerThread);
-            }
-
-            return true;
-        }
-    }
-
-    @Override
-    public E peek() {
-        E item = peekNext();
-        if (item != null) {
-            return item;
-        }
-        if (!drainPutStack()) {
-            return null;
-        }
-        return peekNext();
-    }
-
-    @Override
-    public E take() throws InterruptedException {
-        E item = next();
-        if (item != null) {
-            return item;
-        }
-
-        takeAll();
-        assert takeStackIndex == 0;
-        assert takeStack[takeStackIndex] != null;
-
-        return next();
-    }
-
-    @Override
-    public E poll() {
-        E item = next();
-
-        if (item != null) {
-            return item;
-        }
-
-        if (!drainPutStack()) {
-            return null;
-        }
-
-        return next();
-    }
-
-    private E next() {
-        E item = peekNext();
-        if (item != null) {
-            dequeue();
-        }
-        return item;
-    }
-
-    private E peekNext() {
-        if (takeStackIndex == -1) {
-            return null;
-        }
-
-        if (takeStackIndex == takeStack.length) {
-            takeStackIndex = -1;
-            return null;
-        }
-
-        E item = (E) takeStack[takeStackIndex];
-        if (item == null) {
-            takeStackIndex = -1;
-            return null;
-        }
-        return item;
-    }
-
-    private void dequeue() {
-        takeStack[takeStackIndex] = null;
-        takeStackIndex++;
-        takeStackSize.lazySet(takeStackSize.get() - 1);
-    }
-
-    private void takeAll() throws InterruptedException {
-        AtomicReference<Node> putStack = this.putStack;
-        for (; ; ) {
-            if (consumerThread.isInterrupted()) {
-                putStack.compareAndSet(BLOCKED, null);
-                throw new InterruptedException();
-            }
-
-            Node currentPutStackHead = putStack.get();
-
-            if (currentPutStackHead == null) {
-                // there is nothing to be take, so lets block.
-                if (!putStack.compareAndSet(null, BLOCKED)) {
-                    // we are lucky, something is available
-                    continue;
-                }
-
-                // lets block for real.
-                park();
-            } else if (currentPutStackHead == BLOCKED) {
-                park();
-            } else {
-                if (!putStack.compareAndSet(currentPutStackHead, null)) {
-                    continue;
-                }
-
-                copyIntoTakeStack(currentPutStackHead);
-                break;
-            }
-        }
-    }
-
-    private boolean drainPutStack() {
-        for (; ; ) {
-            Node head = putStack.get();
-            if (head == null) {
-                return false;
-            }
-
-            if (putStack.compareAndSet(head, null)) {
-                copyIntoTakeStack(head);
-                return true;
-            }
-        }
-    }
-
-    private void copyIntoTakeStack(Node putStackHead) {
-        int putStackSize = putStackHead.size;
-
-        takeStackSize.lazySet(putStackSize);
-
-        if (putStackSize > takeStack.length) {
-            takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
-        }
-
-        for (int i = putStackSize - 1; i >= 0; i--) {
-            takeStack[i] = putStackHead.item;
-            putStackHead = putStackHead.next;
-        }
-
-        takeStackIndex = 0;
-        assert takeStack[0] != null;
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * Best effort implementation.
-     */
-    @Override
-    public int size() {
-        Node h = putStack.get();
-        int putStackSize = h == null ? 0 : h.size;
-        return putStackSize + takeStackSize.get();
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return size() == 0;
-    }
-
-    @Override
-    public void put(E e) throws InterruptedException {
-        offer(e);
-    }
-
-    @Override
-    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
-        add(e);
-        return true;
-    }
-
-    @Override
-    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int remainingCapacity() {
-        return Integer.MAX_VALUE;
-    }
-
-    @Override
-    public int drainTo(Collection<? super E> c) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int drainTo(Collection<? super E> c, int maxElements) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Iterator<E> iterator() {
-        throw new UnsupportedOperationException();
-    }
-
-    private static final class Node<E> {
-        Node next;
-        E item;
-        int size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 56cefdf..d38b8f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -630,6 +630,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         assert nodeId != null;
         assert msg != null;
 
+        Lock busyLock0 = busyLock.readLock();
+
+        busyLock0.lock();
+
         try {
             if (stopping) {
                 if (log.isDebugEnabled())
@@ -716,6 +720,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to process message (will ignore): " + msg, e);
         }
+        finally {
+            busyLock0.unlock();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9ecd78e..a436d4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -1573,7 +1574,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).allNodes();
+        return resolveDiscoCache(CU.cacheId(null), topVer).allNodes();
     }
 
     /**
@@ -1581,7 +1582,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return All server nodes for given topology version.
      */
     public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).srvNodes;
+        return resolveDiscoCache(CU.cacheId(null), topVer).srvNodes;
     }
 
     /**
@@ -1592,7 +1593,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Node.
      */
     public ClusterNode node(AffinityTopologyVersion topVer, UUID id) {
-        return resolveDiscoCache(topVer).node(id);
+        return resolveDiscoCache(CU.cacheId(null), topVer).node(id);
     }
 
     /**
@@ -1603,7 +1604,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> cacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).cacheNodes(cacheName, topVer.topologyVersion());
+        return resolveDiscoCache(CU.cacheId(cacheName), topVer).cacheNodes(cacheName, topVer.topologyVersion());
     }
 
     /**
@@ -1614,7 +1615,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).cacheNodes(cacheId, topVer.topologyVersion());
+        return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId, topVer.topologyVersion()); // TODO
     }
 
     /**
@@ -1624,7 +1625,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).allNodesWithCaches(topVer.topologyVersion());
+        return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches(topVer.topologyVersion());
     }
 
     /**
@@ -1634,7 +1635,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).remoteCacheNodes(topVer.topologyVersion());
+        return resolveDiscoCache(CU.cacheId(null), topVer).remoteCacheNodes(topVer.topologyVersion());
     }
 
     /**
@@ -1642,7 +1643,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Oldest alive server nodes with at least one cache configured.
      */
     @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
-        DiscoCache cache = resolveDiscoCache(topVer);
+        DiscoCache cache = resolveDiscoCache(CU.cacheId(null), topVer);
 
         Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
 
@@ -1657,7 +1658,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache affinity nodes.
      */
     public Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).cacheAffinityNodes(CU.cacheId(cacheName), topVer.topologyVersion());
+        int cacheId = CU.cacheId(cacheName);
+
+        return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
     }
 
     /**
@@ -1668,7 +1671,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache affinity nodes.
      */
     public Collection<ClusterNode> cacheAffinityNodes(int cacheId, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
+        return resolveDiscoCache(cacheId, topVer).cacheAffinityNodes(cacheId, topVer.topologyVersion());
     }
 
     /**
@@ -1738,17 +1741,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Checks if cache with given name has at least one node with near cache enabled.
-     *
-     * @param cacheName Cache name.
-     * @param topVer Topology version.
-     * @return {@code True} if cache with given name has at least one node with near cache enabled.
-     */
-    public boolean hasNearCache(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).hasNearCache(CU.cacheId(cacheName));
-    }
-
-    /**
      * Checks if cache with given ID has at least one node with near cache enabled.
      *
      * @param cacheId Cache ID.
@@ -1756,23 +1748,28 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return {@code True} if cache with given name has at least one node with near cache enabled.
      */
     public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(topVer).hasNearCache(cacheId);
+        return resolveDiscoCache(cacheId, topVer).hasNearCache(cacheId);
     }
 
     /**
      * Gets discovery cache for given topology version.
      *
+     * @param cacheId Cache ID (participates in exception message).
      * @param topVer Topology version.
      * @return Discovery cache.
      */
-    private DiscoCache resolveDiscoCache(AffinityTopologyVersion topVer) {
+    private DiscoCache resolveDiscoCache(int cacheId, AffinityTopologyVersion topVer) {
         Snapshot snap = topSnap.get();
 
         DiscoCache cache = AffinityTopologyVersion.NONE.equals(topVer) || topVer.equals(snap.topVer) ?
             snap.discoCache : discoCacheHist.get(topVer);
 
         if (cache == null) {
-            throw new IgniteException("Failed to resolve nodes topology [topVer=" + topVer +
+            DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
+
+            throw new IgniteException("Failed to resolve nodes topology [" +
+                "cacheName=" + (desc != null ? desc.cacheConfiguration().getName() : "N/A") +
+                ", topVer=" + topVer +
                 ", history=" + discoCacheHist.keySet() +
                 ", snap=" + snap +
                 ", locNode=" + ctx.discovery().localNode() + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..144b162 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -63,7 +63,7 @@ public class GridAffinityAssignmentCache {
     private final String cacheName;
 
     /** */
-    private final Integer cacheId;
+    private final int cacheId;
 
     /** Number of backups. */
     private final int backups;
@@ -169,7 +169,7 @@ public class GridAffinityAssignmentCache {
     /**
      * @return Cache ID.
      */
-    public Integer cacheId() {
+    public int cacheId() {
         return cacheId;
     }
 
@@ -266,7 +266,7 @@ public class GridAffinityAssignmentCache {
         List<ClusterNode> sorted;
 
         if (!locCache) {
-            sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer));
+            sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheId(), topVer));
 
             Collections.sort(sorted, GridNodeOrderComparator.INSTANCE);
         }
@@ -617,4 +617,4 @@ public class GridAffinityAssignmentCache {
             return S.toString(AffinityReadyFuture.class, this);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2890887..7bf5fd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -843,7 +843,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             return true;
 
         // If local node did not initiate exchange or local node is the only cache node in grid.
-        Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheName(), fut.topologyVersion());
+        Collection<ClusterNode> affNodes = cctx.discovery().cacheAffinityNodes(aff.cacheId(), fut.topologyVersion());
 
         DynamicCacheDescriptor cacheDesc = registeredCaches.get(aff.cacheId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 68a8d1c..e414160 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -288,9 +288,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /** Asynchronous operations limit semaphore. */
     private Semaphore asyncOpsSem;
 
-    /** */
-    protected volatile boolean asyncToggled;
-
     /** {@inheritDoc} */
     @Override public String name() {
         return cacheCfg.getName();
@@ -367,18 +364,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
-     * Toggles async flag if someone calls {@code withAsync()}
-     * on proxy and since that we have to properly handle all cache
-     * operations (sync and async) to put them in proper sequence.
-     *
-     * TODO: https://issues.apache.org/jira/browse/IGNITE-4393
-     */
-    void toggleAsync() {
-        if (!asyncToggled)
-            asyncToggled = true;
-    }
-
-    /**
      * Prints memory stats.
      */
     public void printMemoryStats() {
@@ -4461,9 +4446,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Failed future if waiting was interrupted.
      */
     @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
-        if (!asyncToggled)
-            return null;
-
         try {
             if (asyncOpsSem != null)
                 asyncOpsSem.acquire();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index bdaf3a0..d20310b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -348,6 +349,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
         final IgniteBiInClosure<UUID, GridCacheMessage> c) {
+        Lock lock = rw.readLock();
+
+        lock.lock();
+
         try {
             if (stopping) {
                 if (log.isDebugEnabled())
@@ -376,6 +381,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         finally {
             if (depEnabled)
                 cctx.deploy().ignoreOwnership(false);
+
+            lock.unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 3e157db..b0e25c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -334,14 +334,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteCache<K, V> withAsync() {
-        if (delegate instanceof GridCacheAdapter)
-            ((GridCacheAdapter)delegate).toggleAsync();
-
-        return super.withAsync();
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withSkipStore() {
         return skipStore();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 3f7fd0d..fe5be0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -217,7 +217,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public GridDhtTopologyFuture topologyVersionFuture() {
-        // TODO
         assert topReadyFut != null;
 
         return topReadyFut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index ab8e863..a79e024 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -79,9 +79,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
         AffinityTopologyVersion topVer
     ) {
         this.ctx = ctx;
-        this.key = new T2<>(CU.cacheId(cacheName), topVer);
+        int cacheId = CU.cacheId(cacheName);
+        this.key = new T2<>(cacheId, topVer);
 
-        Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer);
+        Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheId, topVer);
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -220,4 +221,4 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     @Override public String toString() {
         return S.toString(GridDhtAssignmentFetchFuture.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index b130ed9..9f4a079 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -825,9 +825,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
-        if (CU.cheatCache(cctx.cacheId()))
-            return affNodes;
-
         lock.readLock().lock();
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ba0ea89..4809637 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1797,8 +1797,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             try {
                 GridDhtPartitionTopology top = topology();
 
-                if (!CU.cheatCache(ctx.cacheId()))
-                    top.readLock();
+                top.readLock();
 
                 try {
                     if (top.stopping()) {
@@ -1913,8 +1912,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         remap = true;
                 }
                 finally {
-                    if (!CU.cheatCache(ctx.cacheId()))
-                        top.readUnlock();
+                    top.readUnlock();
                 }
             }
             catch (GridCacheEntryRemovedException e) {
@@ -2915,9 +2913,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 try {
                     GridDhtCacheEntry entry = entryExx(key, topVer);
 
-                    if (CU.cheatCache(ctx.cacheId())) // TODO
-                        return Collections.singletonList(entry);
-
                     GridUnsafe.monitorEnter(entry);
 
                     if (entry.obsolete())
@@ -2993,9 +2988,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param topVer Topology version.
      */
     private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) {
-        if (CU.cheatCache(ctx.cacheId()))
-            return;
-
         // Process deleted entries before locks release.
         assert ctx.deferredDelete() : this;
 
@@ -3197,7 +3189,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
         if (msgLog.isDebugEnabled())
-            msgLog.debug("Received near atomic update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+            msgLog.debug("Received near atomic update response " +
+                "[futId=" + res.futureVersion() + ", node=" + nodeId + ']');
 
         res.nodeId(ctx.localNodeId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index da9cb40..133e42e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -382,60 +382,53 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         AffinityTopologyVersion topVer;
         GridCacheVersion futVer;
 
-        if (!CU.cheatCache(cctx.cacheId())) {
-            cache.topology().readLock();
+        cache.topology().readLock();
 
-            try {
-                if (cache.topology().stopping()) {
-                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                        cache.name()));
+        try {
+            if (cache.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                    cache.name()));
 
-                    return;
-                }
+                return;
+            }
 
-                GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                if (fut.isDone()) {
-                    Throwable err = fut.validateCache(cctx);
+            if (fut.isDone()) {
+                Throwable err = fut.validateCache(cctx);
 
-                    if (err != null) {
-                        onDone(err);
+                if (err != null) {
+                    onDone(err);
 
-                        return;
-                    }
+                    return;
+                }
 
-                    topVer = fut.topologyVersion();
+                topVer = fut.topologyVersion();
 
-                    futVer = addAtomicFuture(topVer);
+                futVer = addAtomicFuture(topVer);
+            }
+            else {
+                if (waitTopFut) {
+                    assert !topLocked : this;
+
+                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology();
+                                }
+                            });
+                        }
+                    });
                 }
-                else {
-                    if (waitTopFut) {
-                        assert !topLocked : this;
-
-                        fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                    @Override public void run() {
-                                        mapOnTopology();
-                                    }
-                                });
-                            }
-                        });
-                    }
-                    else
-                        onDone(new GridCacheTryPutFailedException());
+                else
+                    onDone(new GridCacheTryPutFailedException());
 
-                    return;
-                }
-            }
-            finally {
-                cache.topology().readUnlock();
+                return;
             }
         }
-        else {
-            topVer = cache.topology().topologyVersionFuture().topologyVersion();
-
-            futVer = addAtomicFuture(topVer);
+        finally {
+            cache.topology().readUnlock();
         }
 
         if (futVer != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9f6e761..da33fda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -491,60 +491,53 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion topVer;
         GridCacheVersion futVer;
 
-        if (!CU.cheatCache(cctx.cacheId())) {
-            cache.topology().readLock();
+        cache.topology().readLock();
 
-            try {
-                if (cache.topology().stopping()) {
-                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
-                        cache.name()));
+        try {
+            if (cache.topology().stopping()) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                    cache.name()));
 
-                    return;
-                }
+                return;
+            }
 
-                GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+            GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
-                if (fut.isDone()) {
-                    Throwable err = fut.validateCache(cctx);
+            if (fut.isDone()) {
+                Throwable err = fut.validateCache(cctx);
 
-                    if (err != null) {
-                        onDone(err);
+                if (err != null) {
+                    onDone(err);
 
-                        return;
-                    }
+                    return;
+                }
 
-                    topVer = fut.topologyVersion();
+                topVer = fut.topologyVersion();
 
-                    futVer = addAtomicFuture(topVer);
+                futVer = addAtomicFuture(topVer);
+            }
+            else {
+                if (waitTopFut) {
+                    assert !topLocked : this;
+
+                    fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    mapOnTopology();
+                                }
+                            });
+                        }
+                    });
                 }
-                else {
-                    if (waitTopFut) {
-                        assert !topLocked : this;
-
-                        fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                                    @Override public void run() {
-                                        mapOnTopology();
-                                    }
-                                });
-                            }
-                        });
-                    }
-                    else
-                        onDone(new GridCacheTryPutFailedException());
+                else
+                    onDone(new GridCacheTryPutFailedException());
 
-                    return;
-                }
-            }
-            finally {
-                cache.topology().readUnlock();
+                return;
             }
         }
-        else {
-            topVer = cache.topology().topologyVersionFuture().topologyVersion();
-
-            futVer = addAtomicFuture(topVer);
+        finally {
+            cache.topology().readUnlock();
         }
 
         if (futVer != null)
@@ -634,7 +627,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             updVer = this.updVer;
 
             if (updVer == null) {
-                //updVer = cctx.versions().next(topVer);
                 updVer = futVer;
 
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index eaf1c87..5df7c40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -514,8 +514,8 @@ public class IgniteTxHandler {
         for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
             GridCacheContext ctx = e.context();
 
-            Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
-            Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+            Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
+            Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
 
             if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
                 return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 36178f3..257d0d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -273,33 +273,31 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
      * @return Adjusted time.
      */
     public long adjustedTime(long topVer) {
-//        T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
-//
-//        GridClockDeltaSnapshot snap;
-//
-//        if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
-//            snap = fastSnap.get2();
-//        else {
-//            // Get last synchronized time on given topology version.
-//            Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
-//                new GridClockDeltaVersion(0, topVer + 1));
-//
-//            snap = entry == null ? null : entry.getValue();
-//        }
-//
-//        long now = clockSrc.currentTimeMillis();
-//
-//        if (snap == null)
-//            return now;
-//
-//        Long delta = snap.deltas().get(ctx.localNodeId());
-//
-//        if (delta == null)
-//            delta = 0L;
-//
-//        return now + delta;
-
-        return System.currentTimeMillis();
+        T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
+
+        GridClockDeltaSnapshot snap;
+
+        if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
+            snap = fastSnap.get2();
+        else {
+            // Get last synchronized time on given topology version.
+            Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
+                new GridClockDeltaVersion(0, topVer + 1));
+
+            snap = entry == null ? null : entry.getValue();
+        }
+
+        long now = clockSrc.currentTimeMillis();
+
+        if (snap == null)
+            return now;
+
+        Long delta = snap.deltas().get(ctx.localNodeId());
+
+        if (delta == null)
+            delta = 0L;
+
+        return now + delta;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 39dde09..a653429 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.client.util.MpscQueue;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -75,7 +74,7 @@ public class StripedExecutor implements ExecutorService {
 
         try {
             for (int i = 0; i < cnt; i++) {
-                stripes[i] = new StripeMPSC(
+                stripes[i] = new StripeConcurrentQueue(
                     gridName,
                     poolName,
                     i,
@@ -476,45 +475,6 @@ public class StripedExecutor implements ExecutorService {
         }
     }
 
-    private static class StripeMPSC extends Stripe {
-        private final MpscQueue<Runnable> q = new MpscQueue<>();
-
-        public StripeMPSC(
-            String gridName,
-            String poolName,
-            int idx,
-            IgniteLogger log
-        ) {
-            super(
-                gridName,
-                poolName,
-                idx,
-                log);
-        }
-
-        @Override void start() {
-            super.start();
-
-            q.setConsumerThread(thread);
-        }
-
-        @Override void execute(Runnable cmd) {
-            q.offer(cmd);
-        }
-
-        @Override Runnable take() throws InterruptedException {
-            return q.take();
-        }
-
-        @Override int queueSize() {
-            return q.size();
-        }
-
-        @Override String queueToString() {
-            return q.toString();
-        }
-    }
-
     /**
      * Stripe.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b88631d/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
index 230a3bc..5601254 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerSelfTest.java
@@ -17,15 +17,8 @@
 
 package org.apache.ignite.internal.managers.discovery;
 
-import org.apache.ignite.Ignite;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -83,115 +76,6 @@ public abstract class GridDiscoveryManagerSelfTest extends GridCommonAbstractTes
     }
 
     /**
-     * @throws Exception If failed.
-     */
-    public void testHasNearCache() throws Exception {
-        IgniteKernal g0 = (IgniteKernal)startGrid(0); // PARTITIONED_ONLY cache.
-
-        AffinityTopologyVersion none = new AffinityTopologyVersion(-1);
-        AffinityTopologyVersion one = new AffinityTopologyVersion(1);
-        AffinityTopologyVersion two = new AffinityTopologyVersion(2, 2);
-        AffinityTopologyVersion three = new AffinityTopologyVersion(3);
-        AffinityTopologyVersion four = new AffinityTopologyVersion(4);
-        AffinityTopologyVersion five = new AffinityTopologyVersion(5);
-
-        assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, none));
-        assertFalse(g0.context().discovery().hasNearCache(null, none));
-
-        assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
-        assertFalse(g0.context().discovery().hasNearCache(null, one));
-
-        IgniteKernal g1 = (IgniteKernal)startGrid(1); // NEAR_ONLY cache.
-
-        grid(1).createNearCache(null, new NearCacheConfiguration());
-
-        grid(1).createNearCache(CACHE_NAME, new NearCacheConfiguration());
-
-        assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
-        assertFalse(g0.context().discovery().hasNearCache(null, one));
-        assertTrue(g0.context().discovery().hasNearCache(null, two));
-
-        assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
-        assertTrue(g1.context().discovery().hasNearCache(null, two));
-
-        IgniteKernal g2 = (IgniteKernal)startGrid(2); // PARTITIONED_ONLY cache.
-
-        assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
-        assertFalse(g0.context().discovery().hasNearCache(null, one));
-        assertTrue(g0.context().discovery().hasNearCache(null, two));
-        assertTrue(g0.context().discovery().hasNearCache(null, three));
-
-        assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, two));
-        assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
-        assertTrue(g1.context().discovery().hasNearCache(null, two));
-        assertTrue(g1.context().discovery().hasNearCache(null, three));
-
-        assertTrue(g2.context().discovery().hasNearCache(CACHE_NAME, three));
-        assertTrue(g2.context().discovery().hasNearCache(null, three));
-
-        stopGrid(2);
-
-        // Wait all nodes are on version 4.
-        for (;;) {
-            if (F.forAll(
-                Ignition.allGrids(),
-                new IgnitePredicate<Ignite>() {
-                    @Override public boolean apply(Ignite ignite) {
-                        return ignite.cluster().topologyVersion() == 4;
-                    }
-                }))
-                break;
-
-            Thread.sleep(1000);
-        }
-
-        assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
-        assertFalse(g0.context().discovery().hasNearCache(null, one));
-        assertTrue(g0.context().discovery().hasNearCache(null, two));
-        assertTrue(g0.context().discovery().hasNearCache(null, three));
-        assertTrue(g0.context().discovery().hasNearCache(null, four));
-
-        assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, three));
-        assertTrue(g1.context().discovery().hasNearCache(CACHE_NAME, four));
-        assertTrue(g1.context().discovery().hasNearCache(null, three));
-        assertTrue(g1.context().discovery().hasNearCache(null, four));
-
-        stopGrid(1);
-
-        // Wait all nodes are on version 5.
-        for (;;) {
-            if (F.forAll(
-                Ignition.allGrids(),
-                new IgnitePredicate<Ignite>() {
-                    @Override public boolean apply(Ignite ignite) {
-                        return ignite.cluster().topologyVersion() == 5;
-                    }
-                }))
-                break;
-
-            Thread.sleep(1000);
-        }
-
-        assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, one));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, two));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, three));
-        assertTrue(g0.context().discovery().hasNearCache(CACHE_NAME, four));
-        assertFalse(g0.context().discovery().hasNearCache(CACHE_NAME, five));
-
-        assertFalse(g0.context().discovery().hasNearCache(null, one));
-        assertTrue(g0.context().discovery().hasNearCache(null, two));
-        assertTrue(g0.context().discovery().hasNearCache(null, three));
-        assertTrue(g0.context().discovery().hasNearCache(null, four));
-        assertFalse(g0.context().discovery().hasNearCache(null, five));
-    }
-
-    /**
      *
      */
     public static class RegularDiscovery extends GridDiscoveryManagerSelfTest {


Mime
View raw message