ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [20/24] incubator-ignite git commit: ignite-545: merge from ignite-sprint-6
Date Tue, 09 Jun 2015 06:33:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b8668e6..02f16c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -24,6 +24,10 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -32,7 +36,6 @@ import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
-import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -146,9 +149,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
 
         if (c == null) {
-            if (log.isDebugEnabled())
-                log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg +
-                    ", nodeId=" + nodeId + ']');
+            U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
+                ", nodeId=" + nodeId + ']');
 
             return;
         }
@@ -226,69 +228,67 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             unmarshall(nodeId, cacheMsg);
 
-            if (cacheMsg.allowForStartup())
-                processMessage(nodeId, cacheMsg, c);
+            if (cacheMsg.classError() != null)
+                processFailedMessage(nodeId, cacheMsg);
             else {
-                IgniteInternalFuture<?> startFut = startFuture(cacheMsg);
-
-                if (startFut.isDone())
+                if (cacheMsg.allowForStartup())
                     processMessage(nodeId, cacheMsg, c);
                 else {
-                    if (log.isDebugEnabled())
-                        log.debug("Waiting for start future to complete for message [nodeId=" + nodeId +
-                            ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
-
-                    // Don't hold this thread waiting for preloading to complete.
-                    startFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(final IgniteInternalFuture<?> f) {
-                            cctx.kernalContext().closure().runLocalSafe(
-                                new GridPlainRunnable() {
-                                    @Override public void run() {
-                                        rw.readLock();
-
-                                        try {
-                                            if (stopping) {
-                                                if (log.isDebugEnabled())
-                                                    log.debug("Received cache communication message while stopping " +
-                                                        "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
+                    IgniteInternalFuture<?> startFut = startFuture(cacheMsg);
 
-                                                return;
-                                            }
+                    if (startFut.isDone())
+                        processMessage(nodeId, cacheMsg, c);
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Waiting for start future to complete for message [nodeId=" + nodeId +
+                                ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
 
-                                            f.get();
+                        // Don't hold this thread waiting for preloading to complete.
+                        startFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(final IgniteInternalFuture<?> f) {
+                                cctx.kernalContext().closure().runLocalSafe(
+                                    new GridPlainRunnable() {
+                                        @Override public void run() {
+                                            rw.readLock();
 
-                                            if (log.isDebugEnabled())
-                                                log.debug("Start future completed for message [nodeId=" + nodeId +
-                                                    ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
+                                            try {
+                                                if (stopping) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Received cache communication message while stopping " +
+                                                            "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']');
 
-                                            processMessage(nodeId, cacheMsg, c);
-                                        }
-                                        catch (IgniteCheckedException e) {
-                                            // Log once.
-                                            if (startErr.compareAndSet(false, true))
-                                                U.error(log, "Failed to complete preload start future " +
-                                                    "(will ignore message) " +
-                                                    "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e);
-                                        }
-                                        finally {
-                                            rw.readUnlock();
+                                                    return;
+                                                }
+
+                                                f.get();
+
+                                                if (log.isDebugEnabled())
+                                                    log.debug("Start future completed for message [nodeId=" + nodeId +
+                                                        ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']');
+
+                                                processMessage(nodeId, cacheMsg, c);
+                                            }
+                                            catch (IgniteCheckedException e) {
+                                                // Log once.
+                                                if (startErr.compareAndSet(false, true))
+                                                    U.error(log, "Failed to complete preload start future " +
+                                                        "(will ignore message) " +
+                                                        "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e);
+                                            }
+                                            finally {
+                                                rw.readUnlock();
+                                            }
                                         }
                                     }
-                                }
-                            );
-                        }
-                    });
+                                );
+                            }
+                        });
+                    }
                 }
             }
         }
         catch (Throwable e) {
-            if (X.hasCause(e, ClassNotFoundException.class))
-                U.error(log, "Failed to process message (note that distributed services " +
-                    "do not support peer class loading, if you deploy distributed service " +
-                    "you should have all required classes in CLASSPATH on all nodes in topology) " +
-                    "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']');
-            else
-                U.error(log, "Failed to process message [senderId=" + nodeId + ']', e);
+            U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e);
 
             if (e instanceof Error)
                 throw (Error)e;
@@ -302,6 +302,208 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Sends response on failed message.
+     * @param nodeId node id.
+     * @param res response.
+     * @param cctx shared context.
+     * @param plc grid io policy.
+     */
+    private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheSharedContext cctx,
+        GridIoPolicy plc) {
+        try {
+            cctx.io().send(nodeId, res, plc);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +
+                ",res=" + res + ']', e);
+        }
+    }
+
+    /**
+     * Processes failed messages.
+     * @param nodeId niode id.
+     * @param msg message.
+     * @throws IgniteCheckedException
+     */
+    private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException {
+        GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
+
+        switch (msg.directType()) {
+            case 14: {
+                GridCacheEvictionRequest req = (GridCacheEvictionRequest)msg;
+
+                GridCacheEvictionResponse res = new GridCacheEvictionResponse(
+                    ctx.cacheId(),
+                    req.futureId(),
+                    req.classError() != null
+                );
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 30: {
+                GridDhtLockRequest req = (GridDhtLockRequest)msg;
+
+                GridDhtLockResponse res = new GridDhtLockResponse(
+                    ctx.cacheId(),
+                    req.version(),
+                    req.futureId(),
+                    req.miniId(),
+                    0);
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 34: {
+                GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
+
+                GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(
+                    req.version(),
+                    req.futureId(),
+                    req.miniId());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
+            }
+
+            break;
+
+            case 38: {
+                GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
+
+                GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    req.futureVersion());
+
+                res.onError(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 40: {
+                GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg;
+
+                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    nodeId,
+                    req.futureVersion());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 42: {
+                GridDhtForceKeysRequest req = (GridDhtForceKeysRequest)msg;
+
+                GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
+                    ctx.cacheId(),
+                    req.futureId(),
+                    req.miniId()
+                );
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 45: {
+                GridDhtPartitionSupplyMessage req = (GridDhtPartitionSupplyMessage)msg;
+
+                U.error(log, "Supply message cannot be unmarshalled.", req.classError());
+            }
+
+            break;
+
+            case 49: {
+                GridNearGetRequest req = (GridNearGetRequest)msg;
+
+                GridNearGetResponse res = new GridNearGetResponse(
+                    ctx.cacheId(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.version());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 50: {
+                GridNearGetResponse res = (GridNearGetResponse)msg;
+
+                GridPartitionedGetFuture fut = (GridPartitionedGetFuture)ctx.mvcc().future(
+                    res.version(), res.futureId());
+
+                if (fut == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+                    return;
+                }
+
+                res.error(res.classError());
+
+                fut.onResult(nodeId, res);
+            }
+
+            break;
+
+            case 51: {
+                GridNearLockRequest req = (GridNearLockRequest)msg;
+
+                GridNearLockResponse res = new GridNearLockResponse(
+                    ctx.cacheId(),
+                    req.version(),
+                    req.futureId(),
+                    req.miniId(),
+                    false,
+                    0,
+                    req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 55: {
+                GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
+
+                GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
+                    req.version(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.version(),
+                    req.version(),
+                    null, null, null);
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
+            }
+
+            break;
+
+            default:
+                throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+                    + msg + "]");
+        }
+    }
+
+    /**
      * @param cacheMsg Cache message to get start future.
      * @return Preloader start future.
      */
@@ -744,16 +946,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
         }
         catch (IgniteCheckedException e) {
-            if (cacheMsg.ignoreClassErrors() && X.hasCause(e, InvalidClassException.class,
-                    ClassNotFoundException.class, NoClassDefFoundError.class, UnsupportedClassVersionError.class))
-                cacheMsg.onClassError(e);
-            else
-                throw e;
+            cacheMsg.onClassError(e);
         }
         catch (Error e) {
             if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class,
                 UnsupportedClassVersionError.class))
-                    cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e));
+                cacheMsg.onClassError(new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e));
             else
                 throw e;
         }
@@ -782,7 +980,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings( {"CatchGenericClass", "unchecked"})
+        @SuppressWarnings({"CatchGenericClass", "unchecked"})
         @Override public void onMessage(final UUID nodeId, Object msg) {
             if (log.isDebugEnabled())
                 log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 3dcd0ec..92035af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -115,9 +115,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     @GridToStringInclude
     private final int hash;
 
-    /** Off-heap value pointer. */
-    protected long valPtr;
-
     /** Extras */
     @GridToStringInclude
     private GridCacheEntryExtras extras;
@@ -188,7 +185,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         if (cctx.cache().isIgfsDataCache() &&
             cctx.kernalContext().igfsHelper().isIgfsBlockKey(key.value(cctx.cacheObjectContext(), false))) {
             int newSize = valueLength0(val, null);
-            int oldSize = valueLength0(this.val, (this.val == null && valPtr != 0) ? valueBytes0() : null);
+            int oldSize = valueLength0(this.val, (this.val == null && hasOffHeapPointer()) ? valueBytes0() : null);
 
             int delta = newSize - oldSize;
 
@@ -199,7 +196,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         if (!isOffHeapValuesOnly()) {
             this.val = val;
 
-            valPtr = 0;
+            offHeapPointer(0);
         }
         else {
             try {
@@ -227,12 +224,12 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 if (val != null) {
                     byte type = val.type();
 
-                    valPtr = mem.putOffHeap(valPtr, val.valueBytes(cctx.cacheObjectContext()), type);
+                    offHeapPointer(mem.putOffHeap(offHeapPointer(), val.valueBytes(cctx.cacheObjectContext()), type));
                 }
                 else {
-                    mem.removeOffHeap(valPtr);
+                    mem.removeOffHeap(offHeapPointer());
 
-                    valPtr = 0;
+                    offHeapPointer(0);
                 }
             }
             catch (IgniteCheckedException e) {
@@ -270,7 +267,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
         CacheObject val0 = val;
 
-        if (val0 == null && valPtr != 0) {
+        if (val0 == null && hasOffHeapPointer()) {
             IgniteBiTuple<byte[], Byte> t = valueBytes0();
 
             return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
@@ -434,16 +431,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                     if (e != null) {
                         if (e.offheapPointer() > 0) {
-                            valPtr = e.offheapPointer();
+                            offHeapPointer(e.offheapPointer());
 
                             if (needVal) {
-                                CacheObject val = cctx.fromOffheap(valPtr, false);
+                                CacheObject val = cctx.fromOffheap(offHeapPointer(), false);
 
                                 e.value(val);
                             }
                         }
                         else // Read from swap.
-                            valPtr = 0;
+                            offHeapPointer(0);
                     }
                 }
                 else
@@ -468,7 +465,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                         // Must update valPtr again since update() will reset it.
                         if (cctx.offheapTiered() && e.offheapPointer() > 0)
-                            valPtr = e.offheapPointer();
+                            offHeapPointer(e.offheapPointer());
 
                         return val;
                     }
@@ -495,13 +492,13 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 if (cctx.offheapTiered()) {
                     cctx.swap().removeOffheap(key);
 
-                    valPtr = 0;
+                    offHeapPointer(0);
                 }
 
                 return;
             }
 
-            if (val == null && cctx.offheapTiered() && valPtr != 0) {
+            if (val == null && cctx.offheapTiered() && hasOffHeapPointer()) {
                 if (log.isDebugEnabled())
                     log.debug("Value did not change, skip write swap entry: " + this);
 
@@ -540,10 +537,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     protected IgniteBiTuple<byte[], Byte> valueBytes0() {
         assert Thread.holdsLock(this);
 
-        if (valPtr != 0) {
+        if (hasOffHeapPointer()) {
             assert isOffHeapValuesOnly() || cctx.offheapTiered();
 
-            return cctx.unsafeMemory().get(valPtr);
+            return cctx.unsafeMemory().get(offHeapPointer());
         }
         else {
             assert val != null;
@@ -672,7 +669,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
             CacheObject val = this.val;
 
-            hasOldBytes = valPtr != 0;
+            hasOldBytes = hasOffHeapPointer();
 
             if ((unmarshal || isOffHeapValuesOnly()) && !expired && val == null && hasOldBytes)
                 val = rawGetOrUnmarshalUnlocked(tmp);
@@ -816,7 +813,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                         // Update indexes before actual write to entry.
                         updateIndex(ret, expTime, nextVer, prevVal);
 
-                    boolean hadValPtr = valPtr != 0;
+                    boolean hadValPtr = hasOffHeapPointer();
 
                     // Don't change version for read-through.
                     update(ret, expTime, ttl, nextVer);
@@ -964,13 +961,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         synchronized (this) {
             checkObsolete();
 
-            if (cctx.kernalContext().config().isCacheSanityCheckEnabled()) {
-                if (tx != null && tx.groupLock())
-                    groupLockSanityCheck(tx);
-                else
-                    assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
-                        "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']';
-            }
+            assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
+                "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']';
 
             // Load and remove from swap if it is new.
             boolean startVer = isStartVersion();
@@ -1128,10 +1120,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         synchronized (this) {
             checkObsolete();
 
-            if (tx != null && tx.groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled())
-                groupLockSanityCheck(tx);
-            else
-                assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
+            assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
                     "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
 
             boolean startVer = isStartVersion();
@@ -1164,7 +1153,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             // can be updated without actually holding entry lock.
             clearIndex(old);
 
-            boolean hadValPtr = valPtr != 0;
+            boolean hadValPtr = hasOffHeapPointer();
 
             update(null, 0, 0, newVer);
 
@@ -1198,7 +1187,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                 obsoleteVer = newVer;
             else {
                 // Only delete entry if the lock is not explicit.
-                if (tx.groupLock() || lockedBy(tx.xidVersion()))
+                if (lockedBy(tx.xidVersion()))
                     obsoleteVer = tx.xidVersion();
                 else if (log.isDebugEnabled())
                     log.debug("Obsolete version was not set because lock was explicit: " + this);
@@ -1521,7 +1510,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     // Must persist inside synchronization in non-tx mode.
                     cctx.store().remove(null, keyValue(false));
 
-                boolean hasValPtr = valPtr != 0;
+                boolean hasValPtr = hasOffHeapPointer();
 
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
@@ -2122,7 +2111,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 enqueueVer = newVer;
 
-                boolean hasValPtr = valPtr != 0;
+                boolean hasValPtr = hasOffHeapPointer();
 
                 // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
                 update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
@@ -2799,25 +2788,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     }
 
     /**
-     * Checks that entries in group locks transactions are not locked during commit.
-     *
-     * @param tx Transaction to check.
-     * @throws GridCacheEntryRemovedException If entry is obsolete.
-     * @throws IgniteCheckedException If entry was externally locked.
-     */
-    private void groupLockSanityCheck(IgniteInternalTx tx) throws GridCacheEntryRemovedException, IgniteCheckedException {
-        assert tx.groupLock();
-
-        IgniteTxEntry txEntry = tx.entry(txKey());
-
-        if (txEntry.groupLockEntry()) {
-            if (lockedByAny())
-                throw new IgniteCheckedException("Failed to update cache entry (entry was externally locked while " +
-                    "accessing entry within group lock transaction) [entry=" + this + ", tx=" + tx + ']');
-        }
-    }
-
-    /**
      * @param failFast Fail fast flag.
      * @param topVer Topology version.
      * @param filter Filter.
@@ -2929,8 +2899,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
         if (val != null)
             return val;
 
-        if (valPtr != 0) {
-            CacheObject val0 = cctx.fromOffheap(valPtr, tmp);
+        if (hasOffHeapPointer()) {
+            CacheObject val0 = cctx.fromOffheap(offHeapPointer(), tmp);
 
             if (!tmp && cctx.kernalContext().config().isPeerClassLoadingEnabled())
                 val0.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
@@ -2952,7 +2922,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     protected boolean hasValueUnlocked() {
         assert Thread.holdsLock(this);
 
-        return val != null || valPtr != 0;
+        return val != null || hasOffHeapPointer();
     }
 
     /** {@inheritDoc} */
@@ -3292,12 +3262,13 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     @Override public boolean onTtlExpired(GridCacheVersion obsoleteVer) {
         boolean obsolete = false;
         boolean deferred = false;
+        GridCacheVersion ver0 = null;
 
         try {
             synchronized (this) {
                 CacheObject expiredVal = saveValueForIndexUnlocked();
 
-                boolean hasOldBytes = valPtr != 0;
+                boolean hasOldBytes = hasOffHeapPointer();
 
                 boolean expired = checkExpired();
 
@@ -3305,7 +3276,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     if (!obsolete()) {
                         if (cctx.deferredDelete() && !detached() && !isInternal()) {
                             if (!deletedUnlocked()) {
-                                update(null, 0L, 0L, ver);
+                                update(null, 0L, 0L, ver0 = ver);
 
                                 deletedUnlocked(true);
 
@@ -3345,11 +3316,20 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             U.error(log, "Failed to clean up expired cache entry: " + this, e);
         }
         finally {
-            if (obsolete)
+            if (obsolete) {
                 onMarkedObsolete();
 
-            if (deferred)
-                cctx.onDeferredDelete(this, obsoleteVer);
+                cctx.cache().removeEntry(this);
+            }
+
+            if (deferred) {
+                assert ver0 != null;
+
+                cctx.onDeferredDelete(this, ver0);
+            }
+
+            if ((obsolete || deferred) && cctx.cache().configuration().isStatisticsEnabled())
+                cctx.cache().metrics0().onEvict();
         }
 
         return obsolete;
@@ -3468,10 +3448,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             GridCacheQueryManager qryMgr = cctx.queries();
 
             if (qryMgr != null && qryMgr.enabled()) {
-                qryMgr.store(key.value(cctx.cacheObjectContext(), false),
-                    null,
-                    CU.value(val, cctx, false),
-                    null,
+                qryMgr.store(key,
+                    val,
                     ver,
                     expireTime);
             }
@@ -3494,8 +3472,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
             GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
 
             if (qryMgr != null)
-                qryMgr.remove(key().value(cctx.cacheObjectContext(), false),
-                    prevVal == null ? null : prevVal.value(cctx.cacheObjectContext(), false));
+                qryMgr.remove(key(), prevVal == null ? null : prevVal);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -4095,6 +4072,27 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     }
 
     /**
+     * @return True if entry has off-heap value pointer.
+     */
+    protected boolean hasOffHeapPointer() {
+        return false;
+    }
+
+    /**
+     * @return Off-heap value pointer.
+     */
+    protected long offHeapPointer() {
+        return 0;
+    }
+
+    /**
+     * @param valPtr Off-heap value pointer.
+     */
+    protected void offHeapPointer(long valPtr) {
+        // No-op.
+    }
+
+    /**
      * @return Size of extras object.
      */
     private int extrasSize() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index fefd582..5432c90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -60,7 +60,7 @@ public abstract class GridCacheMessage implements Message {
 
     /** */
     @GridDirectTransient
-    private Exception err;
+    private IgniteCheckedException err;
 
     /** */
     @GridDirectTransient
@@ -115,14 +115,14 @@ public abstract class GridCacheMessage implements Message {
      *
      * @param err Error.
      */
-    public void onClassError(Exception err) {
+    public void onClassError(IgniteCheckedException err) {
         this.err = err;
     }
 
     /**
-     * @return Error set via {@link #onClassError(Exception)} method.
+     * @return Error set via {@link #onClassError(IgniteCheckedException)} method.
      */
-    public Exception classError() {
+    public IgniteCheckedException classError() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0bb97a9..c05e4b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -510,7 +510,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @return Future.
      */
     @SuppressWarnings({"unchecked"})
-    @Nullable public <T> GridCacheFuture<T> future(GridCacheVersion ver, IgniteUuid futId) {
+    @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
         Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
 
         if (futs != null)
@@ -519,7 +519,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     if (log.isDebugEnabled())
                         log.debug("Found future in futures map: " + fut);
 
-                    return (GridCacheFuture<T>)fut;
+                    return fut;
                 }
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5f82ae2..0ecaf97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -409,10 +409,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
-    public @Nullable IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) {
+    @Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) {
         GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
 
-        if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) >= 0) {
+        if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) == 0) {
             if (log.isDebugEnabled())
                 log.debug("Return lastInitializedFut for topology ready future " +
                     "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']');
@@ -745,6 +745,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             int skipped = 0;
 
             for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
+                if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion()) < 0)
+                    continue;
+
                 skipped++;
 
                 if (skipped == EXCH_FUT_CLEANUP_HISTORY_SIZE) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 83f1fed..0e1a9c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -124,6 +124,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Must use JDK marshaller since it is used by discovery to fire custom events. */
     private Marshaller marshaller = new JdkMarshaller();
 
+    /** Count down latch for caches. */
+    private final CountDownLatch cacheStartedLatch = new CountDownLatch(1);
+
     /**
      * @param ctx Kernal context.
      */
@@ -607,7 +610,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
 
-            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(cfg, cacheType, template, IgniteUuid.randomUuid());
+            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template,
+                IgniteUuid.randomUuid());
 
             desc.locallyConfigured(true);
             desc.staticallyConfigured(true);
@@ -635,7 +639,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (cfg.getName() == null) { // Use cache configuration with null name as template.
                 DynamicCacheDescriptor desc0 =
-                    new DynamicCacheDescriptor(cfg, cacheType, true, IgniteUuid.randomUuid());
+                    new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
 
                 desc0.locallyConfigured(true);
                 desc0.staticallyConfigured(true);
@@ -657,87 +661,77 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
-
-        ClusterNode locNode = ctx.discovery().localNode();
-
-        // Init cache plugin managers.
-        final Map<String, CachePluginManager> cache2PluginMgr = new HashMap<>();
-
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            CacheConfiguration locCcfg = desc.cacheConfiguration();
-
-            CachePluginManager pluginMgr = new CachePluginManager(ctx, locCcfg);
-
-            cache2PluginMgr.put(locCcfg.getName(), pluginMgr);
-        }
+        try {
+            if (ctx.config().isDaemon())
+                return;
 
-        if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
-            for (ClusterNode n : ctx.discovery().remoteNodes()) {
-                checkTransactionConfiguration(n);
+            ClusterNode locNode = ctx.discovery().localNode();
 
-                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
-                DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
+            if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
+                for (ClusterNode n : ctx.discovery().remoteNodes()) {
+                    checkTransactionConfiguration(n);
 
-                CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
-                    locDepMode, rmtDepMode, true);
+                    DeploymentMode locDepMode = ctx.config().getDeploymentMode();
+                    DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 
-                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                    CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
+                    CU.checkAttributeMismatch(log, null, n.id(), "deploymentMode", "Deployment mode",
+                        locDepMode, rmtDepMode, true);
 
-                    if (rmtCfg != null) {
-                        CacheConfiguration locCfg = desc.cacheConfiguration();
+                    for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                        CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
 
-                        checkCache(locCfg, rmtCfg, n);
+                        if (rmtCfg != null) {
+                            CacheConfiguration locCfg = desc.cacheConfiguration();
 
-                        // Check plugin cache configurations.
-                        CachePluginManager pluginMgr = cache2PluginMgr.get(locCfg.getName());
+                            checkCache(locCfg, rmtCfg, n);
 
-                        assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                            // Check plugin cache configurations.
+                            CachePluginManager pluginMgr = desc.pluginManager();
 
-                        pluginMgr.validateRemotes(rmtCfg, n);
+                            pluginMgr.validateRemotes(rmtCfg, n);
+                        }
                     }
                 }
             }
-        }
-
-        // Start dynamic caches received from collect discovery data.
-        for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            boolean started = desc.onStart();
 
-            assert started : "Failed to change started flag for locally configured cache: " + desc;
+            // Start dynamic caches received from collect discovery data.
+            for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                boolean started = desc.onStart();
 
-            desc.clearRemoteConfigurations();
+                assert started : "Failed to change started flag for locally configured cache: " + desc;
 
-            CacheConfiguration ccfg = desc.cacheConfiguration();
+                desc.clearRemoteConfigurations();
 
-            IgnitePredicate filter = ccfg.getNodeFilter();
+                CacheConfiguration ccfg = desc.cacheConfiguration();
 
-            if (filter.apply(locNode)) {
-                CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
+                IgnitePredicate filter = ccfg.getNodeFilter();
 
-                CachePluginManager pluginMgr = cache2PluginMgr.get(ccfg.getName());
+                if (filter.apply(locNode)) {
+                    CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-                assert pluginMgr != null : " Map=" + cache2PluginMgr;
+                    CachePluginManager pluginMgr = desc.pluginManager();
 
-                GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+                    GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
 
-                ctx.dynamicDeploymentId(desc.deploymentId());
+                    ctx.dynamicDeploymentId(desc.deploymentId());
 
-                sharedCtx.addCacheContext(ctx);
+                    sharedCtx.addCacheContext(ctx);
 
-                GridCacheAdapter cache = ctx.cache();
+                    GridCacheAdapter cache = ctx.cache();
 
-                String name = ccfg.getName();
+                    String name = ccfg.getName();
 
-                caches.put(maskNull(name), cache);
+                    caches.put(maskNull(name), cache);
 
-                startCache(cache);
+                    startCache(cache);
 
-                jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+                    jCacheProxies.put(maskNull(name), new IgniteCacheProxy(ctx, cache, null, false));
+                }
             }
         }
+        finally {
+            cacheStartedLatch.countDown();
+        }
 
         ctx.marshallerContext().onMarshallerCacheStarted(ctx);
 
@@ -835,6 +829,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStop(boolean cancel) {
+        cacheStartedLatch.countDown();
+
         if (ctx.config().isDaemon())
             return;
 
@@ -959,6 +955,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @throws IgniteCheckedException If failed to wait.
+     */
+    public void awaitStarted() throws IgniteCheckedException {
+        U.await(cacheStartedLatch);
+    }
+
+    /**
      * @param cache Cache.
      * @throws IgniteCheckedException If failed.
      */
@@ -1640,6 +1643,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     if (existing == null) {
                         DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                            ctx,
                             ccfg,
                             req.cacheType(),
                             true,
@@ -1673,6 +1677,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         assert req.cacheType() != null : req;
 
                         DynamicCacheDescriptor desc = new DynamicCacheDescriptor(
+                            ctx,
                             ccfg,
                             req.cacheType(),
                             false,
@@ -2022,7 +2027,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 if (desc == null) {
                     DynamicCacheDescriptor templateDesc =
-                        new DynamicCacheDescriptor(ccfg, req.cacheType(), true, req.deploymentId());
+                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), true, req.deploymentId());
 
                     DynamicCacheDescriptor old = registeredTemplates.put(maskNull(ccfg.getName()), templateDesc);
 
@@ -2076,7 +2081,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     assert req.cacheType() != null : req;
 
                     DynamicCacheDescriptor startDesc =
-                        new DynamicCacheDescriptor(ccfg, req.cacheType(), false, req.deploymentId());
+                        new DynamicCacheDescriptor(ctx, ccfg, req.cacheType(), false, req.deploymentId());
 
                     DynamicCacheDescriptor old = registeredCaches.put(maskNull(ccfg.getName()), startDesc);
 
@@ -2381,8 +2386,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     assert val != null;
 
-                    qryMgr.remove(key.value(cctx.cacheObjectContext(), false),
-                        val.value(cctx.cacheObjectContext(), false));
+                    qryMgr.remove(key, val);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to unmarshal key evicted from swap [swapSpaceName=" + spaceName + ']', e);
@@ -2448,6 +2452,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param name Cache name.
+     * @return Cache instance for given name.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> IgniteInternalCache<K, V> getOrStartCache(@Nullable String name) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Getting cache for name: " + name);
+
+        IgniteCache<K, V> jcache = (IgniteCache<K, V>)jCacheProxies.get(maskNull(name));
+
+        if (jcache == null)
+            jcache = startJCache(name, true);
+
+        return jcache == null ? null : ((IgniteCacheProxy<K, V>)jcache).internalProxy();
+    }
+
+    /**
      * @return All configured cache instances.
      */
     public Collection<IgniteInternalCache<?, ?>> caches() {
@@ -2554,37 +2576,53 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (desc != null && !desc.cacheType().userCache())
             throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
 
-        if (cache == null) {
-            if (desc == null || desc.cancelled()) {
-                if (failIfNotStarted)
-                    throw new IllegalArgumentException("Cache is not started: " + cacheName);
+        if (cache == null)
+           cache = startJCache(cacheName, failIfNotStarted);
 
-                return null;
-            }
+        return cache;
+    }
 
-            DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
+    /**
+     * @param cacheName Cache name.
+     * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
+     *        otherwise returns {@code null} in this case.
+     * @return Cache instance for given name.
+     * @throws IgniteCheckedException If failed.
+     */
+    private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
+        String masked = maskNull(cacheName);
 
-            req.cacheName(cacheName);
+        DynamicCacheDescriptor desc = registeredCaches.get(masked);
 
-            req.deploymentId(desc.deploymentId());
+        if (desc == null || desc.cancelled()) {
+            if (failIfNotStarted)
+                throw new IllegalArgumentException("Cache is not started: " + cacheName);
 
-            CacheConfiguration cfg = new CacheConfiguration(desc.cacheConfiguration());
+            return null;
+        }
 
-            cfg.setNearConfiguration(null);
+        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
 
-            req.startCacheConfiguration(cfg);
+        req.cacheName(cacheName);
 
-            req.cacheType(desc.cacheType());
+        req.deploymentId(desc.deploymentId());
 
-            req.clientStartOnly(true);
+        CacheConfiguration cfg = new CacheConfiguration(desc.cacheConfiguration());
 
-            F.first(initiateCacheChanges(F.asList(req))).get();
+        cfg.setNearConfiguration(null);
 
-            cache = (IgniteCache<K, V>)jCacheProxies.get(masked);
+        req.startCacheConfiguration(cfg);
 
-            if (cache == null && failIfNotStarted)
-                throw new IllegalArgumentException("Cache is not started: " + cacheName);
-        }
+        req.cacheType(desc.cacheType());
+
+        req.clientStartOnly(true);
+
+        F.first(initiateCacheChanges(F.asList(req))).get();
+
+        IgniteCache cache = jCacheProxies.get(masked);
+
+        if (cache == null && failIfNotStarted)
+            throw new IllegalArgumentException("Cache is not started: " + cacheName);
 
         return cache;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 5487944..55d2f84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1390,30 +1390,6 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Override public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException {
-        CacheOperationContext prev = gate.enter(opCtx);
-
-        try {
-            return delegate.swapIterator();
-        }
-        finally {
-            gate.leave(prev);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException {
-        CacheOperationContext prev = gate.enter(opCtx);
-
-        try {
-            return delegate.offHeapIterator();
-        }
-        finally {
-            gate.leave(prev);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public long offHeapEntriesCount() {
         CacheOperationContext prev = gate.enter(opCtx);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index fb6b103..eb82218 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -551,11 +551,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 GridCacheQueryManager qryMgr = cctx.queries();
 
-                if (qryMgr != null) {
-                    qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false),
-                            entry.value().value(cctx.cacheObjectContext(), false),
-                            entry.valueBytes());
-                }
+                if (qryMgr != null)
+                    qryMgr.onUnswap(key, entry.value());
 
                 return entry;
             }
@@ -619,11 +616,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                         GridCacheQueryManager qryMgr = cctx.queries();
 
-                        if (qryMgr != null) {
-                            qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false),
-                                v.value(cctx.cacheObjectContext(), false),
-                                valBytes);
-                        }
+                        if (qryMgr != null)
+                            qryMgr.onUnswap(key, v);
                     }
                     catch (IgniteCheckedException e) {
                         err.set(e);
@@ -758,9 +752,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                                 EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null);
 
                         if (qryMgr != null)
-                            qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false),
-                                    entry.value().value(cctx.cacheObjectContext(), false),
-                                    entry.valueBytes());
+                            qryMgr.onUnswap(key, entry.value());
 
                         GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key,
                             part,
@@ -859,11 +851,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                             // Always fire this event, since preloading depends on it.
                             onUnswapped(swapKey.partition(), key, entry);
 
-                            if (qryMgr != null) {
-                                qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false),
-                                        entry.value().value(cctx.cacheObjectContext(), false),
-                                        entry.valueBytes());
-                            }
+                            if (qryMgr != null)
+                                qryMgr.onUnswap(key, entry.value());
                         }
                         catch (IgniteCheckedException e) {
                             err.set(e);
@@ -941,9 +930,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     if (entry == null)
                         return;
 
-                    qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false),
-                            entry.value().value(cctx.cacheObjectContext(), false),
-                            entry.valueBytes());
+                    qryMgr.onUnswap(key, entry.value());
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteException(e);
@@ -1030,7 +1017,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         GridCacheQueryManager qryMgr = cctx.queries();
 
         if (qryMgr != null)
-            qryMgr.onSwap(spaceName, key.value(cctx.cacheObjectContext(), false));
+            qryMgr.onSwap(key);
     }
 
     /**
@@ -1059,7 +1046,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
 
                 if (qryMgr != null)
-                    qryMgr.onSwap(spaceName, swapEntry.key().value(cctx.cacheObjectContext(), false));
+                    qryMgr.onSwap(swapEntry.key());
             }
         }
         else {
@@ -1081,7 +1068,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null);
 
                     if (qryMgr != null)
-                        qryMgr.onSwap(spaceName, batchSwapEntry.key().value(cctx.cacheObjectContext(), false));
+                        qryMgr.onSwap(batchSwapEntry.key());
                 }
             }
         }
@@ -1224,10 +1211,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         checkIteratorQueue();
 
         if (offHeapEnabled() && !swapEnabled())
-            return rawOffHeapIterator();
+            return rawOffHeapIterator(true, true);
 
         if (swapEnabled() && !offHeapEnabled())
-            return rawSwapIterator();
+            return rawSwapIterator(true, true);
 
         // Both, swap and off-heap are enabled.
         return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
@@ -1240,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             private Map.Entry<byte[], byte[]> cur;
 
             {
-                it = rawOffHeapIterator();
+                it = rawOffHeapIterator(true, true);
 
                 advance();
             }
@@ -1254,7 +1241,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 if (offheapFlag) {
                     offheapFlag = false;
 
-                    it = rawSwapIterator();
+                    it = rawSwapIterator(true, true);
 
                     if (!it.hasNext()) {
                         it.close();
@@ -1326,7 +1313,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
-        return new PartitionsKeyIterator(parts) {
+        return new PartitionsAbstractIterator<KeyCacheObject>(parts) {
             @Override protected Iterator<KeyCacheObject> partitionIterator(int part)
                 throws IgniteCheckedException
             {
@@ -1351,7 +1338,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
-        return new PartitionsKeyIterator(parts) {
+        return new PartitionsAbstractIterator<KeyCacheObject>(parts) {
             @Override protected Iterator<KeyCacheObject> partitionIterator(int part)
                 throws IgniteCheckedException
             {
@@ -1567,37 +1554,91 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param c Key/value closure.
+     * @param primary Include primaries.
+     * @param backup Include backups.
      * @return Off-heap iterator.
      */
-    public <T> GridCloseableIterator<T> rawOffHeapIterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c) {
+    public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+        boolean primary,
+        boolean backup)
+    {
         assert c != null;
 
-        if (!offheapEnabled)
+        if (!offheapEnabled || (!primary && !backup))
             return new GridEmptyCloseableIterator<>();
 
         checkIteratorQueue();
 
-        return offheap.iterator(spaceName, c);
+        if (primary && backup)
+            return offheap.iterator(spaceName, c);
+
+        AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+
+        return new CloseablePartitionsIterator<T, T>(parts) {
+            @Override protected GridCloseableIterator<T> partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return offheap.iterator(spaceName, c, part);
+            }
+        };
     }
 
     /**
+     * @param primary Include primaries.
+     * @param backup Include backups.
      * @return Raw off-heap iterator.
      */
-    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator() {
-        if (!offheapEnabled)
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary,
+        final boolean backup)
+    {
+        if (!offheapEnabled || (!primary && !backup))
             return new GridEmptyCloseableIterator<>();
 
-        return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
-            private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName);
+        if (primary && backup)
+            return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
+                private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName);
+
+                private Map.Entry<byte[], byte[]> cur;
+
+                @Override protected Map.Entry<byte[], byte[]> onNext() {
+                    return cur = it.next();
+                }
+
+                @Override protected boolean onHasNext() {
+                    return it.hasNext();
+                }
+
+                @Override protected void onRemove() throws IgniteCheckedException {
+                    KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
 
+                    int part = cctx.affinity().partition(key);
+
+                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                }
+
+                @Override protected void onClose() throws IgniteCheckedException {
+                    it.close();
+                }
+            };
+
+        AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+
+        return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) {
             private Map.Entry<byte[], byte[]> cur;
 
             @Override protected Map.Entry<byte[], byte[]> onNext() {
-                return cur = it.next();
+                return cur = super.onNext();
             }
 
-            @Override protected boolean onHasNext() {
-                return it.hasNext();
+            @Override protected GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> partitionIterator(int part)
+                throws IgniteCheckedException {
+                return offheap.iterator(spaceName, part);
             }
 
             @Override protected void onRemove() throws IgniteCheckedException {
@@ -1607,10 +1648,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
             }
-
-            @Override protected void onClose() throws IgniteCheckedException {
-                it.close();
-            }
         };
     }
 
@@ -1634,15 +1671,33 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @return Raw off-heap iterator.
+     * @param primary Include primaries.
+     * @param backup Include backups.
      * @throws IgniteCheckedException If failed.
      */
-    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator() throws IgniteCheckedException {
-        if (!swapEnabled)
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup)
+        throws IgniteCheckedException
+    {
+        if (!swapEnabled || (!primary && !backup))
             return new GridEmptyCloseableIterator<>();
 
         checkIteratorQueue();
 
-        return swapMgr.rawIterator(spaceName);
+        if (primary && backup)
+            return swapMgr.rawIterator(spaceName);
+
+        AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
+
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+
+        return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(parts) {
+            @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)
+                throws IgniteCheckedException
+            {
+                return swapMgr.rawIterator(spaceName, part);
+            }
+        };
     }
 
     /**
@@ -1667,7 +1722,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
         return new PartitionsIterator<K, V>(parts) {
-            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part)
+            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part)
                 throws IgniteCheckedException
             {
                 return swapMgr.rawIterator(spaceName, part);
@@ -1682,7 +1737,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Offheap entries iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer)
+    public <K, V> Iterator<Cache.Entry<K, V>> offheapIterator(boolean primary,
+        boolean backup,
+        AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {
         assert primary || backup;
@@ -1697,7 +1754,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
         return new PartitionsIterator<K, V>(parts) {
-            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part) {
+            @Override protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part) {
                 return offheap.iterator(spaceName, part);
             }
         };
@@ -1897,20 +1954,46 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     /**
      *
      */
-    private abstract class PartitionsIterator<K, V> implements Iterator<Cache.Entry<K, V>> {
+    private abstract class PartitionsIterator<K, V> extends PartitionsAbstractIterator<Cache.Entry<K, V>> {
+        /**
+         * @param parts Partitions
+         */
+        public PartitionsIterator(Collection<Integer> parts) {
+            super(parts);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Iterator<Cache.Entry<K, V>> partitionIterator(int part)
+            throws IgniteCheckedException {
+            return cacheEntryIterator(GridCacheSwapManager.this.<K, V>lazyIterator(nextPartition(part)));
+        }
+
+        /**
+         * @param part Partition.
+         * @return Iterator for given partition.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> nextPartition(int part)
+            throws IgniteCheckedException;
+    }
+
+    /**
+     *
+     */
+    private abstract class PartitionsAbstractIterator<T> implements Iterator<T> {
         /** */
         private Iterator<Integer> partIt;
 
         /** */
-        private Iterator<Cache.Entry<K, V>> curIt;
+        private Iterator<T> curIt;
 
         /** */
-        private Cache.Entry<K, V> next;
+        private T next;
 
         /**
          * @param parts Partitions
          */
-        public PartitionsIterator(Collection<Integer> parts) {
+        public PartitionsAbstractIterator(Collection<Integer> parts) {
             this.partIt = parts.iterator();
 
             advance();
@@ -1922,11 +2005,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public Cache.Entry<K, V> next() {
+        @Override public T next() {
             if (next == null)
                 throw new NoSuchElementException();
 
-            Cache.Entry<K, V> e = next;
+            T e = next;
 
             advance();
 
@@ -1950,8 +2033,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         int part = partIt.next();
 
                         try {
-                            curIt = cacheEntryIterator(
-                                GridCacheSwapManager.this.<K, V>lazyIterator(partitionIterator(part)));
+                            curIt = partitionIterator(part);
                         }
                         catch (IgniteCheckedException e) {
                             throw new IgniteException(e);
@@ -1977,58 +2059,70 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
          * @return Iterator for given partition.
          * @throws IgniteCheckedException If failed.
          */
-        abstract protected GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> partitionIterator(int part)
+        abstract protected Iterator<T> partitionIterator(int part)
             throws IgniteCheckedException;
     }
 
     /**
      *
      */
-    private abstract class PartitionsKeyIterator implements Iterator<KeyCacheObject> {
+    private abstract class CloseablePartitionsIterator<T, T1 extends T> extends GridCloseableIteratorAdapter<T> {
         /** */
         private Iterator<Integer> partIt;
 
         /** */
-        private Iterator<KeyCacheObject> curIt;
+        protected GridCloseableIterator<T1> curIt;
 
         /** */
-        private KeyCacheObject next;
+        protected T next;
 
         /**
          * @param parts Partitions
          */
-        public PartitionsKeyIterator(Collection<Integer> parts) {
+        public CloseablePartitionsIterator(Collection<Integer> parts) {
             this.partIt = parts.iterator();
 
-            advance();
+            try {
+                advance();
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public boolean hasNext() {
+        @Override protected boolean onHasNext() {
             return next != null;
         }
 
         /** {@inheritDoc} */
-        @Override public KeyCacheObject next() {
-            if (next == null)
-                throw new NoSuchElementException();
+        @Override protected T onNext() {
+            try {
+                if (next == null)
+                    throw new NoSuchElementException();
 
-            KeyCacheObject e = next;
+                T e = next;
 
-            advance();
+                advance();
 
-            return e;
+                return e;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
+        @Override protected void onClose() throws IgniteCheckedException {
+            if (curIt != null)
+                curIt.close();
         }
 
         /**
          * Switches to next element.
+         * @throws IgniteCheckedException If failed.
          */
-        private void advance() {
+        private void advance() throws IgniteCheckedException {
             next = null;
 
             do {
@@ -2051,8 +2145,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                         break;
                     }
-                    else
+                    else {
+                        curIt.close();
+
                         curIt = null;
+                    }
                 }
             }
             while (partIt.hasNext());
@@ -2063,7 +2160,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
          * @return Iterator for given partition.
          * @throws IgniteCheckedException If failed.
          */
-        abstract protected Iterator<KeyCacheObject> partitionIterator(int part)
-            throws IgniteCheckedException;
+        abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5198b53..5f9049a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -26,22 +26,21 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.thread.*;
 
-import java.util.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 /**
- * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set.
+ * Eagerly removes expired entries from cache when
+ * {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set.
  */
 @SuppressWarnings("NakedNotify")
 public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
-    private final GridConcurrentSkipListSet<EntryWrapper> pendingEntries = new GridConcurrentSkipListSet<>();
+    private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx();
 
     /** Cleanup worker thread. */
     private CleanupWorker cleanupWorker;
 
-    /** Sync mutex. */
-    private final Object mux = new Object();
-
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
@@ -68,24 +67,15 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
      * @param entry Entry to add.
      */
     public void addTrackedEntry(GridCacheMapEntry entry) {
-        EntryWrapper wrapper = new EntryWrapper(entry);
-
-        pendingEntries.add(wrapper);
-
-        // If entry is on the first position, notify waiting thread.
-        if (wrapper == pendingEntries.firstx()) {
-            synchronized (mux) {
-                mux.notifyAll();
-            }
-        }
+        pendingEntries.add(new EntryWrapper(entry));
     }
 
     /**
      * @param entry Entry to remove.
      */
     public void removeTrackedEntry(GridCacheMapEntry entry) {
-        // Remove must be called while holding lock on entry before updating expire time.
-        // No need to wake up waiting thread in this case.
+        assert Thread.holdsLock(entry);
+
         pendingEntries.remove(new EntryWrapper(entry));
     }
 
@@ -97,6 +87,32 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * Expires entries by TTL.
+     */
+    public void expire() {
+        long now = U.currentTimeMillis();
+
+        GridCacheVersion obsoleteVer = null;
+
+        for (int size = pendingEntries.sizex(); size > 0; size--) {
+            EntryWrapper e = pendingEntries.firstx();
+
+            if (e == null || e.expireTime > now)
+                return;
+
+            if (pendingEntries.remove(e)) {
+                if (obsoleteVer == null)
+                    obsoleteVer = cctx.versions().next();
+
+                if (log.isTraceEnabled())
+                    log.trace("Trying to remove expired entry from cache: " + e);
+
+                e.entry.onTtlExpired(obsoleteVer);
+            }
+        }
+    }
+
+    /**
      * Entry cleanup worker.
      */
     private class CleanupWorker extends GridWorker {
@@ -110,52 +126,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             while (!isCancelled()) {
-                long now = U.currentTimeMillis();
-
-                GridCacheVersion obsoleteVer = null;
-
-                for (Iterator<EntryWrapper> it = pendingEntries.iterator(); it.hasNext(); ) {
-                    EntryWrapper wrapper = it.next();
+                expire();
 
-                    if (wrapper.expireTime <= now) {
-                        if (log.isDebugEnabled())
-                            log.debug("Trying to remove expired entry from cache: " + wrapper);
+                EntryWrapper first = pendingEntries.firstx();
 
-                        if (obsoleteVer == null)
-                            obsoleteVer = cctx.versions().next();
+                if (first != null) {
+                    long waitTime = first.expireTime - U.currentTimeMillis();
 
-                        if (wrapper.entry.onTtlExpired(obsoleteVer))
-                            wrapper.entry.context().cache().removeEntry(wrapper.entry);
-
-                        if (wrapper.entry.context().cache().configuration().isStatisticsEnabled())
-                            wrapper.entry.context().cache().metrics0().onEvict();
-
-                        it.remove();
-                    }
-                    else
-                        break;
-                }
-
-                synchronized (mux) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTrackedEntry(..)' method.
-                        EntryWrapper first = pendingEntries.firstx();
-
-                        if (first != null) {
-                            long waitTime = first.expireTime - U.currentTimeMillis();
-
-                            if (waitTime > 0)
-                                mux.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux.wait(5000);
-                    }
+                    if (waitTime > 0)
+                        U.sleep(waitTime);
                 }
+                else
+                    U.sleep(500);
             }
         }
     }
@@ -214,4 +196,58 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
             return res;
         }
     }
+
+    /**
+     * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition:
+     * <ul>
+     *     <li>{@code #add()}</li>
+     *     <li>{@code #remove()}</li>
+     *     <li>{@code #pollFirst()}</li>
+     * <ul/>
+     */
+    private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Size. */
+        private final LongAdder8 size = new LongAdder8();
+
+        /**
+         * @return Size based on performed operations.
+         */
+        public int sizex() {
+            return size.intValue();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(EntryWrapper e) {
+            boolean res = super.add(e);
+
+            assert res;
+
+            size.increment();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            boolean res = super.remove(o);
+
+            if (res)
+                size.decrement();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public EntryWrapper pollFirst() {
+            EntryWrapper e = super.pollFirst();
+
+            if (e != null)
+                size.decrement();
+
+            return e;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e7c7f9d..549f42f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -1157,6 +1156,8 @@ public class GridCacheUtils {
 
         if (ctx.isNear())
             ctx.near().dht().context().evicts().unwind();
+
+        ctx.ttl().expire();
     }
 
     /**
@@ -1166,11 +1167,12 @@ public class GridCacheUtils {
         assert ctx != null;
 
         for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) {
-
             cacheCtx.evicts().unwind();
 
             if (cacheCtx.isNear())
                 cacheCtx.near().dht().context().evicts().unwind();
+
+            cacheCtx.ttl().expire();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 2de5bf0..f840015 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -542,7 +542,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     private void validate(Query qry) {
         if (!GridQueryProcessor.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) &&
             !(qry instanceof ContinuousQuery))
-            throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name());
+            throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() +
+                ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.");
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index fe371ce..5184115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1451,33 +1451,6 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public long swapKeys() throws IgniteCheckedException;
 
     /**
-     * Gets iterator over keys and values belonging to this cache swap space on local node. This
-     * iterator is thread-safe, which means that cache (and therefore its swap space)
-     * may be modified concurrently with iteration over swap.
-     * <p>
-     * Returned iterator supports {@code remove} operation which delegates to
-     * <code>removex(Object, org.apache.ignite.lang.IgnitePredicate[])</code> method.
-     *
-     * @return Iterator over keys.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException;
-
-    /**
-     * Gets iterator over keys and values belonging to this cache off-heap memory on local node. This
-     * iterator is thread-safe, which means that cache (and therefore its off-heap memory)
-     * may be modified concurrently with iteration over off-heap. To achieve better performance
-     * the keys and values deserialized on demand, whenever accessed.
-     * <p>
-     * Returned iterator supports {@code remove} operation which delegates to
-     * <code>removex(Object, org.apache.ignite.lang.IgnitePredicate[])}</code> method.
-     *
-     * @return Iterator over keys.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException;
-
-    /**
      * Forces this cache node to re-balance its partitions. This method is usually used when
      * {@link CacheConfiguration#getRebalanceDelay()} configuration parameter has non-zero value.
      * When many nodes are started or stopped almost concurrently, it is more efficient to delay



Mime
View raw message