ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [09/43] ignite git commit: IGNITE-2329: Implemented a bunch of optimizations: - Garbageless NIO Selector - Get rid of unnecessary ArrayList allocations in GridCacheMvccManager. - Optimized "force keys" futures logic.
Date Thu, 11 Feb 2016 23:27:44 GMT
IGNITE-2329: Implemented a bunch of optimizations:
- Garbageless NIO Selector
- Get rid of unnecessary ArrayList allocations in GridCacheMvccManager.
- Optimized "force keys" futures logic.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/75961eee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/75961eee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/75961eee

Branch: refs/heads/ignite-2249
Commit: 75961eee2513427d94a1c7e0dbb96ac46195544b
Parents: 4210989
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Fri Feb 5 21:13:26 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Feb 5 21:13:26 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  12 +-
 .../processors/cache/GridCacheAdapter.java      |  37 +-
 .../processors/cache/GridCacheMvccManager.java  |  42 +-
 .../processors/cache/GridCachePreloader.java    |   6 +
 .../cache/GridCachePreloaderAdapter.java        |   5 +
 .../processors/cache/GridCacheUtils.java        |  21 +-
 .../dht/GridClientPartitionTopology.java        |   5 +
 .../distributed/dht/GridDhtCacheAdapter.java    |  72 ++-
 .../distributed/dht/GridDhtEmbeddedFuture.java  |  13 +-
 .../cache/distributed/dht/GridDhtGetFuture.java | 176 ++++---
 .../distributed/dht/GridDhtGetSingleFuture.java | 476 +++++++++++++++++++
 .../distributed/dht/GridDhtLocalPartition.java  |  76 +--
 .../distributed/dht/GridDhtPartitionState.java  |   2 +-
 .../dht/GridDhtPartitionTopology.java           |   5 +
 .../dht/GridDhtPartitionTopologyImpl.java       |   9 +
 .../distributed/dht/GridDhtTxPrepareFuture.java |   7 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  40 +-
 .../dht/preloader/GridDhtPreloader.java         |  16 +
 .../cache/distributed/near/GridNearTxLocal.java |   1 -
 .../IgniteCacheObjectProcessorImpl.java         |   2 +-
 .../util/future/GridCompoundFuture.java         |   2 +-
 .../ignite/internal/util/nio/GridNioServer.java | 143 +++++-
 .../util/nio/GridSelectorNioSessionImpl.java    |   2 +-
 .../util/nio/SelectedSelectionKeySet.java       | 111 +++++
 .../org/apache/ignite/lang/IgniteBiTuple.java   |   6 +-
 .../IgniteTxPreloadAbstractTest.java            |   2 +-
 .../near/GridCacheNearReadersSelfTest.java      |  19 +-
 .../apache/ignite/lang/GridTupleSelfTest.java   |  42 +-
 parent/pom.xml                                  |   1 +
 30 files changed, 1119 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index de7c10b..6f07702 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -370,11 +370,21 @@ public final class IgniteSystemProperties {
     /**
      * Manages {@link OptimizedMarshaller} behavior of {@code serialVersionUID} computation for
      * {@link Serializable} classes.
-     * */
+     */
     public static final String IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID =
         "IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID";
 
     /**
+     * If set to {@code true}, then default selected keys set is used inside
+     * {@code GridNioServer} which lead to some extra garbage generation when
+     * processing selected keys.
+     * <p>
+     * Default value is {@code false}. Should be switched to {@code true} if there are
+     * any problems in communication layer.
+     */
+    public static final String IGNITE_NO_SELECTOR_OPTS = "IGNITE_NO_SELECTOR_OPTS";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9f54ddb..84eb0b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -101,7 +101,6 @@ import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1823,7 +1822,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @param needVer If {@code true} returns values as tuples containing value and version.
      * @return Future.
      */
-    public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final Collection<KeyCacheObject> keys,
+    public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
+        @Nullable final Collection<KeyCacheObject> keys,
         final boolean readThrough,
         boolean checkTx,
         @Nullable final UUID subjId,
@@ -1834,7 +1834,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         final boolean keepCacheObjects,
         boolean canRemap,
         final boolean needVer
-        ) {
+    ) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
 
@@ -1853,11 +1853,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         if (tx == null || tx.implicit()) {
             try {
-                final AffinityTopologyVersion topVer = tx == null
-                    ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion())
-                    : tx.topologyVersion();
+                final AffinityTopologyVersion topVer = tx == null ?
+                    (canRemap ?
+                        ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+                        tx.topologyVersion();
+
+                int keysSize = keys.size();
 
-                final Map<K1, V1> map = new GridLeanMap<>(keys.size());
+                final Map<K1, V1> map = keysSize == 1 ?
+                    (Map<K1, V1>)new IgniteBiTuple<>() :
+                    U.<K1, V1>newHashMap(keysSize);
 
                 final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
 
@@ -1893,7 +1898,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                     GridCacheVersion ver = entry.version();
 
                                     if (misses == null)
-                                        misses = new GridLeanMap<>();
+                                        misses = new HashMap<>();
 
                                     misses.put(key, ver);
                                 }
@@ -1913,7 +1918,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                 if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
                                     ctx.evicts().touch(entry, topVer);
 
-                                if (keys.size() == 1)
+                                if (keysSize == 1)
                                     // Safe to return because no locks are required in READ_COMMITTED mode.
                                     return new GridFinishedFuture<>(map);
                             }
@@ -2051,17 +2056,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         }
                     );
                 }
-                else {
-                    // If misses is not empty and store is disabled, we should touch missed entries.
-                    if (misses != null) {
-                        for (KeyCacheObject key : misses.keySet()) {
-                            GridCacheEntryEx entry = peekEx(key);
-
-                            if (entry != null)
-                                ctx.evicts().touch(entry, topVer);
-                        }
-                    }
-                }
+                else
+                    // Misses can be non-zero only if store is enabled.
+                    assert misses == null;
 
                 return new GridFinishedFuture<>(map);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c7d1f62..b2c23f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,6 +17,18 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -52,18 +64,6 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
@@ -77,12 +77,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private static final int MAX_REMOVED_LOCKS = 10240;
 
     /** Pending locks per thread. */
-    private final ThreadLocal<LinkedList<GridCacheMvccCandidate>> pending =
-        new ThreadLocal<LinkedList<GridCacheMvccCandidate>>() {
-            @Override protected LinkedList<GridCacheMvccCandidate> initialValue() {
-                return new LinkedList<>();
-            }
-        };
+    private final ThreadLocal<Deque<GridCacheMvccCandidate>> pending = new ThreadLocal<>();
 
     /** Pending near local locks and topology version per thread. */
     private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit;
@@ -683,7 +678,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @return Remote candidates.
      */
     public Collection<GridCacheMvccCandidate> remoteCandidates() {
-        Collection<GridCacheMvccCandidate> rmtCands = new LinkedList<>();
+        Collection<GridCacheMvccCandidate> rmtCands = new ArrayList<>();
 
         for (GridDistributedCacheEntry entry : locked())
             rmtCands.addAll(entry.remoteMvccSnapshot());
@@ -697,7 +692,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @return Local candidates.
      */
     public Collection<GridCacheMvccCandidate> localCandidates() {
-        Collection<GridCacheMvccCandidate> locCands = new LinkedList<>();
+        Collection<GridCacheMvccCandidate> locCands = new ArrayList<>();
 
         for (GridDistributedCacheEntry entry : locked()) {
             try {
@@ -726,7 +721,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         if (cacheCtx.isNear() || cand.singleImplicit())
             return true;
 
-        LinkedList<GridCacheMvccCandidate> queue = pending.get();
+        Deque<GridCacheMvccCandidate> queue = pending.get();
+
+        if (queue == null)
+            pending.set(queue = new ArrayDeque<>());
 
         GridCacheMvccCandidate prev = null;
 
@@ -751,7 +749,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * Reset MVCC context.
      */
     public void contextReset() {
-        pending.set(new LinkedList<GridCacheMvccCandidate>());
+        pending.set(null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c8fcb90..be019fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -137,6 +137,12 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<Boolean> rebalanceFuture();
 
     /**
+     * @return {@code true} if there is no need to force keys preloading
+     *      (e.g. rebalancing has been completed).
+     */
+    public boolean needForceKeys();
+
+    /**
      * Requests that preloader sends the request for the key.
      *
      * @param keys Keys to request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index a1704fc..5d98c6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -93,6 +93,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean needForceKeys() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public void onReconnected() {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 8723827..cd21794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -751,23 +751,28 @@ public class GridCacheUtils {
      * @param <T> Collection element type.
      * @return Reducer.
      */
-    public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer() {
+    public static <T> IgniteReducer<Collection<T>, Collection<T>> collectionsReducer(final int size) {
         return new IgniteReducer<Collection<T>, Collection<T>>() {
-            private final Collection<T> ret = new ConcurrentLinkedQueue<>();
+            private List<T> ret;
+
+            @Override public synchronized boolean collect(Collection<T> c) {
+                if (c == null)
+                    return true;
+
+                if (ret == null)
+                    ret = new ArrayList<>(size);
 
-            @Override public boolean collect(Collection<T> c) {
-                if (c != null)
-                    ret.addAll(c);
+                ret.addAll(c);
 
                 return true;
             }
 
-            @Override public Collection<T> reduce() {
-                return ret;
+            @Override public synchronized Collection<T> reduce() {
+                return ret == null ? Collections.<T>emptyList() : ret;
             }
 
             /** {@inheritDoc} */
-            @Override public String toString() {
+            @Override public synchronized String toString() {
                 return "Collection reducer: " + ret;
             }
         };

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index dcfc038..ad4943e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -336,6 +336,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void releasePartitions(int... parts) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public List<GridDhtLocalPartition> localPartitions() {
         return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5be4e72..8e456e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -698,7 +698,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable UUID subjId,
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        boolean skipVals) {
+        boolean skipVals
+    ) {
         GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
             msgId,
             reader,
@@ -718,21 +719,63 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /**
      * @param nodeId Node ID.
+     * @param msgId Message ID.
+     * @param key Key.
+     * @param addRdr Add reader flag.
+     * @param readThrough Read through flag.
+     * @param topVer Topology version flag.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
+     * @param expiry Expiry.
+     * @param skipVals Skip vals flag.
+     * @return Future for the operation.
+     */
+    private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync(
+        UUID nodeId,
+        long msgId,
+        KeyCacheObject key,
+        boolean addRdr,
+        boolean readThrough,
+        AffinityTopologyVersion topVer,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean skipVals
+    ) {
+        GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>(
+            ctx,
+            msgId,
+            nodeId,
+            key,
+            addRdr,
+            readThrough,
+            /*tx*/null,
+            topVer,
+            subjId,
+            taskNameHash,
+            expiry,
+            skipVals);
+
+        fut.init();
+
+        return fut;
+    }
+
+    /**
+     * @param nodeId Node ID.
      * @param req Get request.
      */
     protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
         assert ctx.affinityNode();
 
-        long ttl = req.accessTtl();
-
-        final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
-
-        Map<KeyCacheObject, Boolean> map = Collections.singletonMap(req.key(), req.addReader());
+        final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(req.accessTtl());
 
-        IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut =
-            getDhtAsync(nodeId,
+        IgniteInternalFuture<GridCacheEntryInfo> fut =
+            getDhtSingleAsync(
+                nodeId,
                 req.messageId(),
-                map,
+                req.key(),
+                req.addReader(),
                 req.readThrough(),
                 req.topologyVersion(),
                 req.subjectId(),
@@ -740,19 +783,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 expiryPlc,
                 req.skipValues());
 
-        fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
-            @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
+        fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
+            @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) {
                 GridNearSingleGetResponse res;
 
-                GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
-                    (GridDhtFuture<Collection<GridCacheEntryInfo>>)f;
+                GridDhtFuture<GridCacheEntryInfo> fut = (GridDhtFuture<GridCacheEntryInfo>)f;
 
                 try {
-                    Collection<GridCacheEntryInfo> entries = fut.get();
+                    GridCacheEntryInfo info = fut.get();
 
                     if (F.isEmpty(fut.invalidPartitions())) {
-                        GridCacheEntryInfo info = F.first(entries);
-
                         Message res0 = null;
 
                         if (info != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
index 0d10a93..1b9f743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Collections;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiClosure;
 
@@ -32,10 +31,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Retries. */
-    @GridToStringInclude
-    private Collection<Integer> invalidParts;
-
     /**
      * @param c Closure.
      * @param embedded Embedded.
@@ -45,8 +40,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
         IgniteInternalFuture<B> embedded
     ) {
         super(c, embedded);
-
-        invalidParts = Collections.emptyList();
     }
 
     /**
@@ -58,17 +51,15 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implem
         IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c
     ) {
         super(embedded, c);
-
-        invalidParts = Collections.emptyList();
     }
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> invalidPartitions() {
-        return invalidParts;
+        return Collections.emptyList();
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtEmbeddedFuture.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index c926c13..fa753b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -82,7 +83,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private Map<KeyCacheObject, Boolean> keys;
 
     /** Reserved partitions. */
-    private Collection<GridDhtLocalPartition> parts = new HashSet<>();
+    private int[] parts;
 
     /** Future ID. */
     private IgniteUuid futId;
@@ -137,7 +138,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals
     ) {
-        super(CU.<GridCacheEntryInfo>collectionsReducer());
+        super(CU.<GridCacheEntryInfo>collectionsReducer(keys.size()));
 
         assert reader != null;
         assert !F.isEmpty(keys);
@@ -194,8 +195,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     @Override public boolean onDone(Collection<GridCacheEntryInfo> res, Throwable err) {
         if (super.onDone(res, err)) {
             // Release all partitions reserved by this future.
-            for (GridDhtLocalPartition part : parts)
-                part.release();
+            if (parts != null)
+                cctx.topology().releasePartitions(parts);
 
             return true;
         }
@@ -209,68 +210,92 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     private void map(final Map<KeyCacheObject, Boolean> keys) {
         GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
 
-        if (!F.isEmpty(fut.invalidPartitions())) {
-            if (retries == null)
-                retries = new HashSet<>();
+        if (fut != null) {
+            if (!F.isEmpty(fut.invalidPartitions())) {
+                if (retries == null)
+                    retries = new HashSet<>();
 
-            retries.addAll(fut.invalidPartitions());
-        }
+                retries.addAll(fut.invalidPartitions());
+            }
 
-        add(new GridEmbeddedFuture<>(
-            new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
-                @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) {
-                    if (e != null) { // Check error first.
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');
+            add(new GridEmbeddedFuture<>(
+                new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
+                    @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) {
+                        if (e != null) { // Check error first.
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');
 
-                        onDone(e);
+                            onDone(e);
+                        }
+                        else
+                            map0(keys);
+
+                        // Finish this one.
+                        return Collections.emptyList();
                     }
+                },
+                fut));
+        }
+        else
+            map0(keys);
+    }
 
-                    Map<KeyCacheObject, Boolean> mappedKeys = null;
+    /**
+     * @param keys Keys to map.
+     */
+    private void map0(Map<KeyCacheObject, Boolean> keys) {
+        Map<KeyCacheObject, Boolean> mappedKeys = null;
 
-                    // Assign keys to primary nodes.
-                    for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
-                        int part = cctx.affinity().partition(key.getKey());
+        // Assign keys to primary nodes.
+        for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
+            int part = cctx.affinity().partition(key.getKey());
 
-                        if (retries == null || !retries.contains(part)) {
-                            if (!map(key.getKey(), parts)) {
-                                if (retries == null)
-                                    retries = new HashSet<>();
+            if (retries == null || !retries.contains(part)) {
+                if (!map(key.getKey())) {
+                    if (retries == null)
+                        retries = new HashSet<>();
 
-                                retries.add(part);
+                    retries.add(part);
 
-                                if (mappedKeys == null) {
-                                    mappedKeys = U.newLinkedHashMap(keys.size());
+                    if (mappedKeys == null) {
+                        mappedKeys = U.newLinkedHashMap(keys.size());
 
-                                    for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) {
-                                        if (key1.getKey() == key.getKey())
-                                            break;
+                        for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) {
+                            if (key1.getKey() == key.getKey())
+                                break;
 
-                                        mappedKeys.put(key.getKey(), key1.getValue());
-                                    }
-                                }
-                            }
-                            else if (mappedKeys != null)
-                                mappedKeys.put(key.getKey(), key.getValue());
+                            mappedKeys.put(key.getKey(), key1.getValue());
                         }
                     }
+                }
+                else if (mappedKeys != null)
+                    mappedKeys.put(key.getKey(), key.getValue());
+            }
+        }
 
-                    // Add new future.
-                    add(getAsync(mappedKeys == null ? keys : mappedKeys));
+        // Add new future.
+        IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = getAsync(mappedKeys == null ? keys : mappedKeys);
 
-                    // Finish this one.
-                    return Collections.emptyList();
-                }
-            },
-            fut));
+        // Optimization to avoid going through compound future,
+        // if getAsync() has been completed and no other futures added to this
+        // compound future.
+        if (fut.isDone() && futuresSize() == 0) {
+            if (fut.error() != null)
+                onDone(fut.error());
+            else
+                onDone(fut.result());
+
+            return;
+        }
+
+        add(fut);
     }
 
     /**
      * @param key Key.
-     * @param parts Parts to map.
      * @return {@code True} if mapped.
      */
-    private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts) {
+    private boolean map(KeyCacheObject key) {
         GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
             cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
             cache().topology().localPartition(key, false);
@@ -278,10 +303,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         if (part == null)
             return false;
 
-        if (!parts.contains(part)) {
+        if (parts == null || !F.contains(parts, part.id())) {
             // By reserving, we make sure that partition won't be unloaded while processed.
             if (part.reserve()) {
-                parts.add(part);
+                parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
+
+                parts[parts.length - 1] = part.id();
 
                 return true;
             }
@@ -422,37 +449,56 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             );
         }
 
+        if (fut.isDone()) {
+            if (fut.error() != null)
+                onDone(fut.error());
+            else
+                return new GridFinishedFuture<>(toEntryInfos(fut.result()));
+        }
+
         return new GridEmbeddedFuture<>(
             new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() {
-                @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) {
+                @Override public Collection<GridCacheEntryInfo> apply(
+                    Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e
+                ) {
                     if (e != null) {
                         onDone(e);
 
                         return Collections.emptyList();
                     }
-                    else {
-                        Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
+                    else
+                        return toEntryInfos(map);
+                }
+            },
+            fut);
+    }
 
-                        for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
-                            T2<CacheObject, GridCacheVersion> val = entry.getValue();
+    /**
+     * @param map Map to convert.
+     * @return List of infos.
+     */
+    private Collection<GridCacheEntryInfo> toEntryInfos(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) {
+        if (map.isEmpty())
+            return Collections.emptyList();
 
-                            assert val != null;
+        Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
 
-                            GridCacheEntryInfo info = new GridCacheEntryInfo();
+        for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) {
+            T2<CacheObject, GridCacheVersion> val = entry.getValue();
 
-                            info.cacheId(cctx.cacheId());
-                            info.key(entry.getKey());
-                            info.value(skipVals ? null : val.get1());
-                            info.version(val.get2());
+            assert val != null;
 
-                            infos.add(info);
-                        }
+            GridCacheEntryInfo info = new GridCacheEntryInfo();
 
-                        return infos;
-                    }
-                }
-            },
-            fut);
+            info.cacheId(cctx.cacheId());
+            info.key(entry.getKey());
+            info.value(skipVals ? null : val.get1());
+            info.version(val.get2());
+
+            infos.add(info);
+        }
+
+        return infos;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
new file mode 100644
index 0000000..d9851c7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCacheEntryInfo>
+    implements GridDhtFuture<GridCacheEntryInfo> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    private static IgniteLogger log;
+
+    /** Message ID. */
+    private long msgId;
+
+    /** */
+    private UUID reader;
+
+    /** Read through flag. */
+    private boolean readThrough;
+
+    /** Context. */
+    private GridCacheContext<K, V> cctx;
+
+    /** Key. */
+    private KeyCacheObject key;
+
+    /** */
+    private boolean addRdr;
+
+    /** Reserved partitions. */
+    private int part = -1;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Version. */
+    private GridCacheVersion ver;
+
+    /** Topology version .*/
+    private AffinityTopologyVersion topVer;
+
+    /** Transaction. */
+    private IgniteTxLocalEx tx;
+
+    /** Retries because ownership changed. */
+    private Collection<Integer> retries;
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name. */
+    private int taskNameHash;
+
+    /** Expiry policy. */
+    private IgniteCacheExpiryPolicy expiryPlc;
+
+    /** Skip values flag. */
+    private boolean skipVals;
+
+    /**
+     * @param cctx Context.
+     * @param msgId Message ID.
+     * @param reader Reader.
+     * @param key Key.
+     * @param addRdr Add reader flag.
+     * @param readThrough Read through flag.
+     * @param tx Transaction.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     */
+    public GridDhtGetSingleFuture(
+        GridCacheContext<K, V> cctx,
+        long msgId,
+        UUID reader,
+        KeyCacheObject key,
+        Boolean addRdr,
+        boolean readThrough,
+        @Nullable IgniteTxLocalEx tx,
+        @NotNull AffinityTopologyVersion topVer,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals
+    ) {
+        assert reader != null;
+        assert key != null;
+
+        this.reader = reader;
+        this.cctx = cctx;
+        this.msgId = msgId;
+        this.key = key;
+        this.addRdr = addRdr;
+        this.readThrough = readThrough;
+        this.tx = tx;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.expiryPlc = expiryPlc;
+        this.skipVals = skipVals;
+
+        futId = IgniteUuid.randomUuid();
+
+        ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class);
+    }
+
+    /**
+     * Initializes future.
+     */
+    void init() {
+        map();
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Future version.
+     */
+    public GridCacheVersion version() {
+        return ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(GridCacheEntryInfo res, Throwable err) {
+        if (super.onDone(res, err)) {
+            // Release all partitions reserved by this future.
+            if (part != -1)
+                cctx.topology().releasePartitions(part);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     *
+     */
+    private void map() {
+        if (cctx.dht().dhtPreloader().needForceKeys()) {
+            GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+                Collections.singleton(key),
+                topVer);
+
+            if (fut != null) {
+                if (F.isEmpty(fut.invalidPartitions())) {
+                    if (retries == null)
+                        retries = new HashSet<>();
+
+                    retries.addAll(fut.invalidPartitions());
+                }
+
+                fut.listen(
+                    new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                        @Override public void apply(IgniteInternalFuture<Object> fut) {
+                            Throwable e = fut.error();
+
+                            if (e != null) { // Check error first.
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to request keys from preloader " +
+                                        "[keys=" + key + ", err=" + e + ']');
+
+                                onDone(e);
+                            }
+                            else
+                                map0();
+                        }
+                    }
+                );
+
+                return;
+            }
+        }
+
+        map0();
+    }
+
+    /**
+     *
+     */
+    private void map0() {
+        // Assign keys to primary nodes.
+        int part = cctx.affinity().partition(key);
+
+        if (retries == null || !retries.contains(part)) {
+            if (!map(key)) {
+                retries = Collections.singleton(part);
+
+                onDone((GridCacheEntryInfo)null);
+
+                return;
+            }
+        }
+
+        getAsync();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> invalidPartitions() {
+        return retries == null ? Collections.<Integer>emptyList() : retries;
+    }
+
+    /**
+     * @param key Key.
+     * @return {@code True} if mapped.
+     */
+    private boolean map(KeyCacheObject key) {
+        GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
+            cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
+            cache().topology().localPartition(key, false);
+
+        if (part == null)
+            return false;
+
+        assert this.part == -1;
+
+        // By reserving, we make sure that partition won't be unloaded while processed.
+        if (part.reserve()) {
+            this.part = part.id();
+
+            return true;
+        }
+        else
+            return false;
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings( {"unchecked", "IfMayBeConditional"})
+    private void getAsync() {
+        assert part != -1;
+
+        String taskName0 = cctx.kernalContext().job().currentTaskName();
+
+        if (taskName0 == null)
+            taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash);
+
+        final String taskName = taskName0;
+
+        IgniteInternalFuture<Boolean> rdrFut = null;
+
+        ClusterNode readerNode = cctx.discovery().node(reader);
+
+        if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+            while (true) {
+                GridDhtCacheEntry e = cache().entryExx(key, topVer);
+
+                try {
+                    if (e.obsolete())
+                        continue;
+
+                    boolean addReader = (!e.deleted() && addRdr && !skipVals);
+
+                    if (addReader)
+                        e.unswap(false);
+
+                    // Register reader. If there are active transactions for this entry,
+                    // then will wait for their completion before proceeding.
+                    // TODO: GG-4003:
+                    // TODO: What if any transaction we wait for actually removes this entry?
+                    // TODO: In this case seems like we will be stuck with untracked near entry.
+                    // TODO: To fix, check that reader is contained in the list of readers once
+                    // TODO: again after the returned future completes - if not, try again.
+                    rdrFut = addReader ? e.addReader(reader, msgId, topVer) : null;
+
+                    break;
+                }
+                catch (IgniteCheckedException err) {
+                    onDone(err);
+
+                    return;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry when getting a DHT value: " + e);
+                }
+                finally {
+                    cctx.evicts().touch(e, topVer);
+                }
+            }
+        }
+
+        IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
+
+        if (rdrFut == null || rdrFut.isDone()) {
+            if (tx == null) {
+                fut = cache().getDhtAllAsync(
+                    Collections.singleton(key),
+                    readThrough,
+                    subjId,
+                    taskName,
+                    expiryPlc,
+                    skipVals,
+                    /*can remap*/true);
+            }
+            else {
+                fut = tx.getAllAsync(cctx,
+                    Collections.singleton(key),
+                    /*deserialize binary*/false,
+                    skipVals,
+                    /*keep cache objects*/true,
+                    /*skip store*/!readThrough,
+                    false);
+            }
+        }
+        else {
+            rdrFut.listen(
+                new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                        Throwable e = fut.error();
+
+                        if (e != null) {
+                            onDone(e);
+
+                            return;
+                        }
+
+                        IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut0;
+
+                        if (tx == null) {
+                            fut0 = cache().getDhtAllAsync(
+                                Collections.singleton(key),
+                                readThrough,
+                                subjId,
+                                taskName,
+                                expiryPlc,
+                                skipVals,
+                                /*can remap*/true);
+                        }
+                        else {
+                            fut0 = tx.getAllAsync(cctx,
+                                Collections.singleton(key),
+                                /*deserialize binary*/false,
+                                skipVals,
+                                /*keep cache objects*/true,
+                                /*skip store*/!readThrough,
+                                false
+                            );
+                        }
+
+                        fut0.listen(createGetFutureListener());
+                    }
+                }
+            );
+
+            return;
+        }
+
+        if (fut.isDone())
+            onResult(fut);
+        else
+            fut.listen(createGetFutureListener());
+    }
+
+    /**
+     * @return Listener for get future.
+     */
+    @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>
+    createGetFutureListener() {
+        return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() {
+            @Override public void apply(
+                IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut
+            ) {
+                onResult(fut);
+            }
+        };
+    }
+
+    /**
+     * @param fut Completed future to finish this process with.
+     */
+    private void onResult(IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut) {
+        assert fut.isDone();
+
+        if (fut.error() != null)
+            onDone(fut.error());
+        else {
+            try {
+                onDone(toEntryInfo(fut.get()));
+            }
+            catch (IgniteCheckedException e) {
+                assert false; // Should never happen.
+            }
+        }
+    }
+
+    /**
+     * @param map Map to convert.
+     * @return List of infos.
+     */
+    private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map) {
+        if (map.isEmpty())
+            return null;
+
+        T2<CacheObject, GridCacheVersion> val = map.get(key);
+
+        assert val != null;
+
+        GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+        info.cacheId(cctx.cacheId());
+        info.key(key);
+        info.value(skipVals ? null : val.get1());
+        info.version(val.get2());
+
+        return info;
+    }
+
+    /**
+     * @return DHT cache.
+     */
+    private GridDhtCacheAdapter<K, V> cache() {
+        return (GridDhtCacheAdapter<K, V>)cctx.cache();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index c4312b5..4fc1eaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -83,8 +82,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
     /** State. */
     @GridToStringExclude
-    private final AtomicStampedReference<GridDhtPartitionState> state =
-        new AtomicStampedReference<>(MOVING, 0);
+    private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32);
 
     /** Rent future. */
     @GridToStringExclude
@@ -153,8 +151,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @return {@code false} If such reservation already added.
      */
     public boolean addReservation(GridDhtPartitionsReservation r) {
-        assert state.getReference() != EVICTED : "we can reserve only active partitions";
-        assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation";
+        assert GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32)) != EVICTED :
+            "we can reserve only active partitions";
+        assert (state.get() & 0xFFFF) != 0 : "partition must be already reserved before adding group reservation";
 
         return reservations.addIfAbsent(r);
     }
@@ -185,14 +184,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @return Partition state.
      */
     public GridDhtPartitionState state() {
-        return state.getReference();
+        return GridDhtPartitionState.fromOrdinal((int)(state.get() >> 32));
     }
 
     /**
      * @return Reservations.
      */
     public int reservations() {
-        return state.getStamp();
+        return (int)(state.get() & 0xFFFF);
     }
 
     /**
@@ -385,14 +384,12 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      */
     @Override public boolean reserve() {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            GridDhtPartitionState s = state.getReference();
-
-            if (s == EVICTED)
+            if ((int)(reservations >> 32) == EVICTED.ordinal())
                 return false;
 
-            if (state.compareAndSet(s, s, reservations, reservations + 1))
+            if (state.compareAndSet(reservations, reservations + 1))
                 return true;
         }
     }
@@ -402,17 +399,15 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      */
     @Override public void release() {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            if (reservations == 0)
+            if ((int)(reservations & 0xFFFF) == 0)
                 return;
 
-            GridDhtPartitionState s = state.getReference();
-
-            assert s != EVICTED;
+            assert (int)(reservations >> 32) != EVICTED.ordinal();
 
             // Decrement reservations.
-            if (state.compareAndSet(s, s, reservations, --reservations)) {
+            if (state.compareAndSet(reservations, --reservations)) {
                 tryEvict();
 
                 break;
@@ -421,23 +416,32 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
+     * @param reservations Current aggregated value.
+     * @param toState State to switch to.
+     * @return {@code true} if cas succeeds.
+     */
+    private boolean casState(long reservations, GridDhtPartitionState toState) {
+        return state.compareAndSet(reservations, (reservations & 0xFFFF) | ((long)toState.ordinal() << 32));
+    }
+
+    /**
      * @return {@code True} if transitioned to OWNING state.
      */
     boolean own() {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            GridDhtPartitionState s = state.getReference();
+            int ord = (int)(reservations >> 32);
 
-            if (s == RENTING || s == EVICTED)
+            if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
                 return false;
 
-            if (s == OWNING)
+            if (ord == OWNING.ordinal())
                 return true;
 
-            assert s == MOVING;
+            assert ord == MOVING.ordinal();
 
-            if (state.compareAndSet(MOVING, OWNING, reservations, reservations)) {
+            if (casState(reservations, OWNING)) {
                 if (log.isDebugEnabled())
                     log.debug("Owned partition: " + this);
 
@@ -455,14 +459,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      */
     IgniteInternalFuture<?> rent(boolean updateSeq) {
         while (true) {
-            int reservations = state.getStamp();
+            long reservations = state.get();
 
-            GridDhtPartitionState s = state.getReference();
+            int ord = (int)(reservations >> 32);
 
-            if (s == RENTING || s == EVICTED)
+            if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
                 return rent;
 
-            if (state.compareAndSet(s, RENTING, reservations, reservations)) {
+            if (casState(reservations, RENTING)) {
                 if (log.isDebugEnabled())
                     log.debug("Moved partition to RENTING state: " + this);
 
@@ -481,9 +485,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @param updateSeq Update sequence.
      */
     void tryEvictAsync(boolean updateSeq) {
+        long reservations = state.get();
+
+        int ord = (int)(reservations >> 32);
+
         if (map.isEmpty() && !GridQueryProcessor.isEnabled(cctx.config()) &&
-            state.getReference() == RENTING && state.getStamp() == 0 &&
-            state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+            ord == RENTING.ordinal() && (reservations & 0xFFFF) == 0 &&
+            casState(reservations, EVICTED)) {
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
 
@@ -520,13 +528,17 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      *
      */
     public void tryEvict() {
-        if (state.getReference() != RENTING || state.getStamp() != 0 || groupReserved())
+        long reservations = state.get();
+
+        int ord = (int)(reservations >> 32);
+
+        if (ord != RENTING.ordinal() || (reservations & 0xFFFF) != 0 || groupReserved())
             return;
 
         // Attempt to evict partition entries from cache.
         clearAll();
 
-        if (map.isEmpty() && state.compareAndSet(RENTING, EVICTED, 0, 0)) {
+        if (map.isEmpty() && casState(reservations, EVICTED)) {
             if (log.isDebugEnabled())
                 log.debug("Evicted partition: " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
index 7b49369..041f135 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionState.java
@@ -52,4 +52,4 @@ public enum GridDhtPartitionState {
     public boolean active() {
         return this != EVICTED;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index dd06d6f..84889f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -112,6 +112,11 @@ public interface GridDhtPartitionTopology {
         throws GridDhtInvalidPartitionException;
 
     /**
+     * @param parts Partitions to release (should be reserved before).
+     */
+    public void releasePartitions(int... parts);
+
+    /**
      * @param key Cache key.
      * @param create If {@code true}, then partition will be created if it's not there.
      * @return Local partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index d6fc8f1..0e579ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -612,6 +612,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public void releasePartitions(int... parts) {
+        assert parts != null;
+        assert parts.length > 0;
+
+        for (int i = 0; i < parts.length; i++)
+            locParts.get(parts[i]).release();
+    }
+
+    /** {@inheritDoc} */
     @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
         return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 41b28d5..4c783f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -988,7 +988,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
             lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
 
-            if (compFut != null)
+            if (compFut != null && lastForceFut != null)
                 compFut.add(lastForceFut);
         }
 
@@ -997,11 +997,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
             return compFut;
         }
-        else {
-            assert lastForceFut != null;
-
+        else
             return lastForceFut;
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f6f57ee..6c7bac5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1309,7 +1309,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
-        if (forceFut.isDone())
+        if (forceFut == null || forceFut.isDone())
             updateAllAsyncInternal0(nodeId, req, completionCb);
         else {
             forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index dc4b6bd..1a2eb22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -897,28 +897,24 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
 
         // Prevent embedded future creation if possible.
-        if (keyFut.isDone()) {
-            try {
-                // Check for exception.
-                keyFut.get();
-
-                return lockAllAsync0(cacheCtx,
-                    tx,
-                    threadId,
-                    ver,
-                    topVer,
-                    keys,
-                    txRead,
-                    retval,
-                    timeout,
-                    accessTtl,
-                    filter,
-                    skipStore,
-                    keepBinary);
-            }
-            catch (IgniteCheckedException e) {
-                return new GridFinishedFuture<>(e);
-            }
+        if (keyFut == null || keyFut.isDone()) {
+            // Check for exception.
+            if (keyFut != null && keyFut.error() != null)
+                return new GridFinishedFuture<>(keyFut.error());
+
+            return lockAllAsync0(cacheCtx,
+                tx,
+                threadId,
+                ver,
+                topVer,
+                keys,
+                txRead,
+                retval,
+                timeout,
+                accessTtl,
+                filter,
+                skipStore,
+                keepBinary);
         }
         else {
             return new GridEmbeddedFuture<>(keyFut,

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index f0054e4..6ec02a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -403,6 +403,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         try {
             demandLock.readLock().lock();
+
             try {
                 demander.handleSupplyMessage(idx, id, s);
             }
@@ -692,12 +693,27 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean needForceKeys() {
+        if (cctx.rebalanceEnabled()) {
+            IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
+
+            if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
+                return false;
+        }
+
+        return true;
+    }
+
     /**
      * @param keys Keys to request.
      * @return Future for request.
      */
     @SuppressWarnings( {"unchecked", "RedundantCast"})
     @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
+        if (!needForceKeys())
+            return null;
+
         final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
 
         IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b7b480e..0853b77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 54dd69e..2e825b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -302,7 +302,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
                     ClassLoader ldr = ctx.p2pEnabled() ?
                         IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
 
-                     Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
+                    Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
 
                     return new KeyCacheObjectImpl(val, valBytes);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index c382497..3409341 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -258,7 +258,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
     /**
      * @return Futures size.
      */
-    private int futuresSize() {
+    protected int futuresSize() {
         synchronized (futs) {
             return futs.size();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index c7679c0..75fa9f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -43,10 +44,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -102,6 +103,24 @@ public class GridNioServer<T> {
     /** SSL write buf limit. */
     private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
 
+    /** */
+    private static final boolean DISABLE_KEYSET_OPTIMIZATION =
+        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
+
+    /**
+     *
+     */
+    static {
+        // This is a workaround for JDK bug (NPE in Selector.open()).
+        // http://bugs.sun.com/view_bug.do?bug_id=6427854
+        try {
+            Selector.open().close();
+        }
+        catch (IOException ignored) {
+            // No-op.
+        }
+    }
+
     /** Accept worker thread. */
     @GridToStringExclude
     private final IgniteThread acceptThread;
@@ -184,17 +203,6 @@ public class GridNioServer<T> {
     /** Optional listener to monitor outbound message queue size. */
     private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
 
-    /** Static initializer ensures single-threaded execution of workaround. */
-    static {
-        // This is a workaround for JDK bug (NPE in Selector.open()).
-        // http://bugs.sun.com/view_bug.do?bug_id=6427854
-        try {
-            Selector.open().close();
-        }
-        catch (IOException ignored) {
-        }
-    }
-
     /**
      * @param addr Address.
      * @param port Port.
@@ -445,10 +453,8 @@ public class GridNioServer<T> {
             // Change from 0 to 1 means that worker thread should be waken up.
             clientWorkers.get(ses.selectorIndex()).offer(fut);
 
-        IgniteBiInClosure<GridNioSession, Integer> lsnr0 = msgQueueLsnr;
-
-        if (lsnr0 != null)
-            lsnr0.apply(ses, msgCnt);
+        if (msgQueueLsnr != null)
+            msgQueueLsnr.apply(ses, msgCnt);
     }
 
     /**
@@ -1239,6 +1245,9 @@ public class GridNioServer<T> {
         /** Selector to select read events. */
         private Selector selector;
 
+        /** Selected keys. */
+        private SelectedSelectionKeySet selectedKeys;
+
         /** Worker index. */
         private final int idx;
 
@@ -1253,7 +1262,7 @@ public class GridNioServer<T> {
             throws IgniteCheckedException {
             super(gridName, name, log);
 
-            selector = createSelector(null);
+            createSelector();
 
             this.idx = idx;
         }
@@ -1262,10 +1271,11 @@ public class GridNioServer<T> {
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             try {
                 boolean reset = false;
+
                 while (!closed) {
                     try {
                         if (reset)
-                            selector = createSelector(null);
+                            createSelector();
 
                         bodyInternal();
                     }
@@ -1290,6 +1300,50 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void createSelector() throws IgniteCheckedException {
+            selectedKeys = null;
+
+            selector = GridNioServer.this.createSelector(null);
+
+            if (DISABLE_KEYSET_OPTIMIZATION)
+                return;
+
+            try {
+                SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
+
+                Class<?> selectorImplClass =
+                    Class.forName("sun.nio.ch.SelectorImpl", false, U.gridClassLoader());
+
+                // Ensure the current selector implementation is what we can instrument.
+                if (!selectorImplClass.isAssignableFrom(selector.getClass()))
+                    return;
+
+                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
+                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
+
+                selectedKeysField.setAccessible(true);
+                publicSelectedKeysField.setAccessible(true);
+
+                selectedKeysField.set(selector, selectedKeySet);
+                publicSelectedKeysField.set(selector, selectedKeySet);
+
+                selectedKeys = selectedKeySet;
+
+                if (log.isDebugEnabled())
+                    log.debug("Instrumented an optimized java.util.Set into: " + selector);
+            }
+            catch (Exception e) {
+                selectedKeys = null;
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to instrument an optimized java.util.Set into selector [selector=" + selector
+                        + ", err=" + e + ']');
+            }
+        }
+
+        /**
          * Adds socket channel to the registration queue and wakes up reading thread.
          *
          * @param req Change request.
@@ -1385,7 +1439,10 @@ public class GridNioServer<T> {
                     // Wake up every 2 seconds to check if closed.
                     if (selector.select(2000) > 0) {
                         // Walk through the ready keys collection and process network events.
-                        processSelectedKeys(selector.selectedKeys());
+                        if (selectedKeys == null)
+                            processSelectedKeys(selector.selectedKeys());
+                        else
+                            processSelectedKeysOptimized(selectedKeys.flip());
                     }
 
                     long now = U.currentTimeMillis();
@@ -1431,10 +1488,58 @@ public class GridNioServer<T> {
          * @param keys Selected keys.
          * @throws ClosedByInterruptException If this thread was interrupted while reading data.
          */
+        private void processSelectedKeysOptimized(SelectionKey[] keys) throws ClosedByInterruptException {
+            for (int i = 0; ; i ++) {
+                final SelectionKey key = keys[i];
+
+                if (key == null)
+                    break;
+
+                // null out entry in the array to allow to have it GC'ed once the Channel close
+                // See https://github.com/netty/netty/issues/2363
+                keys[i] = null;
+
+                // Was key closed?
+                if (!key.isValid())
+                    continue;
+
+                GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+
+                assert ses != null;
+
+                try {
+                    if (key.isReadable())
+                        processRead(key);
+
+                    if (key.isValid() && key.isWritable())
+                        processWrite(key);
+                }
+                catch (ClosedByInterruptException e) {
+                    // This exception will be handled in bodyInternal() method.
+                    throw e;
+                }
+                catch (Exception e) {
+                    if (!closed)
+                        U.warn(log, "Failed to process selector key (will close): " + ses, e);
+
+                    close(ses, new GridNioException(e));
+                }
+            }
+        }
+
+        /**
+         * Processes keys selected by a selector.
+         *
+         * @param keys Selected keys.
+         * @throws ClosedByInterruptException If this thread was interrupted while reading data.
+         */
         private void processSelectedKeys(Set<SelectionKey> keys) throws ClosedByInterruptException {
             if (log.isTraceEnabled())
                 log.trace("Processing keys in client worker: " + keys.size());
 
+            if (keys.isEmpty())
+                return;
+
             for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {
                 SelectionKey key = iter.next();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index deb7d2b..1241f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -309,4 +309,4 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     @Override public String toString() {
         return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java
new file mode 100644
index 0000000..9aa245d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SelectedSelectionKeySet.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+
+import java.nio.channels.SelectionKey;
+import java.util.AbstractSet;
+import java.util.Iterator;
+
+final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
+
+    private SelectionKey[] keysA;
+    private int keysASize;
+    private SelectionKey[] keysB;
+    private int keysBSize;
+    private boolean isA = true;
+
+    SelectedSelectionKeySet() {
+        keysA = new SelectionKey[1024];
+        keysB = keysA.clone();
+    }
+
+    @Override
+    public boolean add(SelectionKey o) {
+        if (o == null) {
+            return false;
+        }
+
+        if (isA) {
+            int size = keysASize;
+            keysA[size ++] = o;
+            keysASize = size;
+            if (size == keysA.length) {
+                doubleCapacityA();
+            }
+        } else {
+            int size = keysBSize;
+            keysB[size ++] = o;
+            keysBSize = size;
+            if (size == keysB.length) {
+                doubleCapacityB();
+            }
+        }
+
+        return true;
+    }
+
+    private void doubleCapacityA() {
+        SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1];
+        System.arraycopy(keysA, 0, newKeysA, 0, keysASize);
+        keysA = newKeysA;
+    }
+
+    private void doubleCapacityB() {
+        SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1];
+        System.arraycopy(keysB, 0, newKeysB, 0, keysBSize);
+        keysB = newKeysB;
+    }
+
+    SelectionKey[] flip() {
+        if (isA) {
+            isA = false;
+            keysA[keysASize] = null;
+            keysBSize = 0;
+            return keysA;
+        } else {
+            isA = true;
+            keysB[keysBSize] = null;
+            keysASize = 0;
+            return keysB;
+        }
+    }
+
+    @Override
+    public int size() {
+        if (isA) {
+            return keysASize;
+        } else {
+            return keysBSize;
+        }
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        return false;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return false;
+    }
+
+    @Override
+    public Iterator<SelectionKey> iterator() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/75961eee/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
index 6098007..89e5f16 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
@@ -250,7 +250,9 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
 
     /** {@inheritDoc} */
     @Override public Set<Map.Entry<V1, V2>> entrySet() {
-        return Collections.<Entry<V1, V2>>singleton(this);
+        return isEmpty() ?
+            Collections.<Entry<V1,V2>>emptySet() :
+            Collections.<Entry<V1, V2>>singleton(this);
     }
 
     /** {@inheritDoc} */
@@ -301,4 +303,4 @@ public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
     @Override public String toString() {
         return S.toString(IgniteBiTuple.class, this);
     }
-}
\ No newline at end of file
+}


Mime
View raw message