ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [20/50] [abbrv] ignite git commit: ignite-971 Fixed offheap to swap eviction, added failover tests with swap/offheap, added retries for tx 'check backup' rollback.
Date Fri, 18 Sep 2015 02:29:28 GMT
ignite-971 Fixed offheap to swap eviction, added failover tests with swap/offheap, added retries for tx 'check backup' rollback.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a7490a6e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a7490a6e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a7490a6e

Branch: refs/heads/ignite-843
Commit: a7490a6e48d4c9f1e85a3ae08f97d6c5ced7a71b
Parents: eb7d2b0
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Sep 15 12:46:48 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Sep 15 12:46:48 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheMetricsImpl.java      |   2 +-
 .../processors/cache/GridCacheAdapter.java      |  21 +-
 .../processors/cache/GridCacheEntryEx.java      |  13 ++
 .../cache/GridCacheEvictionManager.java         |  18 +-
 .../processors/cache/GridCacheMapEntry.java     |  52 ++++-
 .../cache/GridCacheSwapEntryImpl.java           |  24 +--
 .../processors/cache/GridCacheSwapManager.java  | 190 +++++++++++++++---
 .../processors/cache/GridCacheUtils.java        |  21 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   7 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   9 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   3 +
 .../near/GridNearTxFinishFuture.java            |  32 ++-
 .../cache/transactions/IgniteTxHandler.java     |   9 +-
 .../offheap/GridOffHeapProcessor.java           |  25 ++-
 .../util/offheap/GridOffHeapEvictListener.java  |   5 +
 .../internal/util/offheap/GridOffHeapMap.java   |  13 +-
 .../util/offheap/GridOffHeapMapFactory.java     |  28 +--
 .../util/offheap/GridOffHeapPartitionedMap.java |  12 ++
 .../util/offheap/unsafe/GridUnsafeMap.java      | 128 ++++++++----
 .../unsafe/GridUnsafePartitionedMap.java        |   9 +
 .../cache/CacheSwapUnswapGetTest.java           |  85 +++++++-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   7 +-
 .../GridCacheAbstractRemoveFailureTest.java     | 199 +++++++++++++------
 .../cache/GridCacheMemoryModeSelfTest.java      |   9 +-
 .../cache/GridCachePutAllFailoverSelfTest.java  |  60 ++++++
 .../processors/cache/GridCacheTestEntryEx.java  |   6 +
 .../GridCacheAbstractNodeRestartSelfTest.java   | 149 ++++++++++++--
 .../GridCacheDhtAtomicRemoveFailureTest.java    |  16 +-
 .../dht/GridCacheDhtRemoveFailureTest.java      |  16 +-
 .../dht/GridCacheTxNodeFailureSelfTest.java     |   2 +-
 .../IgniteCacheCrossCacheTxFailoverTest.java    |  53 +++--
 .../IgniteCachePutRetryAbstractSelfTest.java    | 166 ++++++++++++++--
 .../dht/IgniteCachePutRetryAtomicSelfTest.java  |   2 +
 ...gniteCachePutRetryTransactionalSelfTest.java |  50 +++--
 ...eAtomicInvalidPartitionHandlingSelfTest.java |  98 +++++++--
 ...tomicPrimaryWriteOrderRemoveFailureTest.java |  15 +-
 .../GridCacheAtomicRemoveFailureTest.java       |  15 +-
 .../GridCacheAtomicNearRemoveFailureTest.java   |  15 +-
 ...cPrimaryWriteOrderNearRemoveFailureTest.java |  15 +-
 .../near/GridCacheNearRemoveFailureTest.java    |  15 +-
 .../GridCachePartitionedNodeRestartTest.java    |   9 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |   9 +-
 .../GridCacheReplicatedNodeRestartSelfTest.java |   8 +-
 .../offheap/GridOffHeapMapAbstractSelfTest.java |  16 +-
 .../GridOffHeapMapPerformanceAbstractTest.java  |   4 +-
 ...idOffHeapPartitionedMapAbstractSelfTest.java |  20 ++
 .../unsafe/GridUnsafeMapPerformanceTest.java    |   2 +-
 .../offheap/unsafe/GridUnsafeMapSelfTest.java   |   2 +-
 .../GridOffHeapMapPerformanceAbstractTest.java  |   4 +-
 .../unsafe/GridUnsafeMapPerformanceTest.java    |   2 +-
 .../ignite/testframework/GridTestUtils.java     | 117 +++++++++++
 .../IgniteCacheFailoverTestSuite.java           |   6 -
 .../IgniteCacheFailoverTestSuite3.java          |  62 ++++++
 .../testsuites/IgniteCacheRestartTestSuite.java |  14 +-
 .../IgniteCacheRestartTestSuite2.java           |  47 +++++
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  31 ++-
 .../cache/GridCacheOffHeapAndSwapSelfTest.java  |   4 +
 .../IgniteCacheQueryMultiThreadedSelfTest.java  |   9 +-
 ...QueryOffheapEvictsMultiThreadedSelfTest.java |   5 -
 ...lientQueryReplicatedNodeRestartSelfTest.java |   8 +-
 .../IgniteCacheQueryNodeRestartSelfTest.java    |   4 +-
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |   8 +-
 62 files changed, 1584 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 1554e07..dfa0217 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -843,7 +843,7 @@ public class CacheMetricsImpl implements CacheMetrics {
         offHeapEvicts.incrementAndGet();
 
         if (delegate != null)
-            delegate.onOffHeapRemove();
+            delegate.onOffHeapEvict();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9329e94..1fc94ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4096,21 +4096,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                     return t;
                 }
-                catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
-                    IgniteTxRollbackCheckedException e) {
+                catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException e) {
                     throw e;
                 }
                 catch (IgniteCheckedException e) {
-                    try {
-                        tx.rollback();
+                    if (!(e instanceof IgniteTxRollbackCheckedException)) {
+                        try {
+                            tx.rollback();
 
-                        e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
-                            tx.xid(), e);
-                    }
-                    catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
-                        U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+                            e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
+                                tx.xid(), e);
+                        }
+                        catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
+                            U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
 
-                        U.addLastCause(e, e1, log);
+                            U.addLastCause(e, e1, log);
+                        }
                     }
 
                     if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 98e86ed..430590a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -867,6 +867,19 @@ public interface GridCacheEntryEx {
     public void updateTtl(@Nullable GridCacheVersion ver, long ttl);
 
     /**
+     * Tries to do offheap -> swap eviction.
+     *
+     * @param entry Serialized swap entry.
+     * @param evictVer Version when entry was selected for eviction.
+     * @param obsoleteVer Obsolete version.
+     * @throws IgniteCheckedException If failed.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     * @return {@code True} if entry was obsoleted and written to swap.
+     */
+    public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+        throws IgniteCheckedException, GridCacheEntryRemovedException;
+
+    /**
      * @return Value.
      * @throws IgniteCheckedException If failed to read from swap storage.
      * @throws GridCacheEntryRemovedException If entry was removed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index f60c0eb..3e0e2f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -958,7 +958,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
         List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
 
-        Set<GridCacheEntryEx> notRemove = null;
+        Set<GridCacheEntryEx> notRmv = null;
 
         Collection<GridCacheBatchSwapEntry> swapped = new ArrayList<>(keys.size());
 
@@ -990,10 +990,10 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                 locked.add(entry);
 
                 if (entry.obsolete()) {
-                    if (notRemove == null)
-                        notRemove = new HashSet<>();
+                    if (notRmv == null)
+                        notRmv = new HashSet<>();
 
-                    notRemove.add(entry);
+                    notRmv.add(entry);
 
                     continue;
                 }
@@ -1004,11 +1004,19 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                 GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer);
 
                 if (swapEntry != null) {
+                    assert entry.obsolete() : entry;
+
                     swapped.add(swapEntry);
 
                     if (log.isDebugEnabled())
                         log.debug("Entry was evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']');
                 }
+                else if (!entry.obsolete()) {
+                    if (notRmv == null)
+                        notRmv = new HashSet<>();
+
+                    notRmv.add(entry);
+                }
             }
 
             // Batch write to swap.
@@ -1025,7 +1033,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
             // Remove entries and fire events outside the locks.
             for (GridCacheEntryEx entry : locked) {
-                if (entry.obsolete() && (notRemove == null || !notRemove.contains(entry))) {
+                if (entry.obsolete() && (notRmv == null || !notRmv.contains(entry))) {
                     entry.onMarkedObsolete();
 
                     cache.removeEntry(entry);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index eb4d864..f2bb646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -433,6 +433,41 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
+    @Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
+        assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this;
+
+        boolean obsolete;
+
+        synchronized (this) {
+            checkObsolete();
+
+            if (hasReaders() || !isStartVersion())
+                return false;
+
+            GridCacheMvcc mvcc = mvccExtras();
+
+            if (mvcc != null && !mvcc.isEmpty(obsoleteVer))
+                return false;
+
+            if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
+                assert !hasValueUnlocked() : this;
+
+                obsolete = markObsolete0(obsoleteVer, false);
+
+                assert obsolete : this;
+            }
+            else
+                obsolete = false;
+        }
+
+        if (obsolete)
+            onMarkedObsolete();
+
+        return obsolete;
+    }
+
+    /** {@inheritDoc} */
     @Override public CacheObject unswap() throws IgniteCheckedException, GridCacheEntryRemovedException {
         return unswap(true);
     }
@@ -536,7 +571,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     log.debug("Value did not change, skip write swap entry: " + this);
 
                 if (cctx.swap().offheapEvictionEnabled())
-                    cctx.swap().enableOffheapEviction(key());
+                    cctx.swap().enableOffheapEviction(key(), partition());
 
                 return;
             }
@@ -2988,7 +3023,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         synchronized (this) {
             checkObsolete();
 
-            if (isNew() || (!preload && deletedUnlocked())) {
+            if ((isNew() && !cctx.swap().containsKey(key, partition())) || (!preload && deletedUnlocked())) {
                 long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
 
                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
@@ -3643,6 +3678,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             if (F.isEmptyOrNulls(filter)) {
                 synchronized (this) {
+                    if (obsoleteVersionExtras() != null)
+                        return true;
+
                     CacheObject prev = saveValueForIndexUnlocked();
 
                     if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
@@ -3684,6 +3722,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         return false;
 
                     synchronized (this) {
+                        if (obsoleteVersionExtras() != null)
+                            return true;
+
                         if (!v.equals(ver))
                             // Version has changed since entry passed the filter. Do it again.
                             continue;
@@ -3768,6 +3809,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
                 if (!isStartVersion() && hasValueUnlocked()) {
+                    if (cctx.offheapTiered() && hasOffHeapPointer()) {
+                        if (cctx.swap().offheapEvictionEnabled())
+                            cctx.swap().enableOffheapEviction(key(), partition());
+
+                        return null;
+                    }
+
                     IgniteUuid valClsLdrId = null;
                     IgniteUuid keyClsLdrId = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index 81490a7..b7c66d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -126,9 +126,9 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
      * @return Version.
      */
     public static GridCacheVersion version(byte[] bytes) {
-        int off = VERSION_OFFSET; // Skip ttl, expire time.
+        long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
 
-        boolean verEx = bytes[off++] != 0;
+        boolean verEx = UNSAFE.getByte(bytes, off++) != 0;
 
         return U.readVersion(bytes, off, verEx);
     }
@@ -157,26 +157,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
         return new IgniteBiTuple<>(valBytes, type);
     }
 
-    /**
-     * @param bytes Entry bytes.
-     * @return Value bytes offset.
-     */
-    public static int valueOffset(byte[] bytes) {
-        assert bytes.length > 40 : bytes.length;
-
-        int off = VERSION_OFFSET; // Skip ttl, expire time.
-
-        boolean verEx = bytes[off++] != 0;
-
-        off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
-
-        off += 5; // Byte array flag + array size.
-
-        assert bytes.length >= off;
-
-        return off;
-    }
-
     /** {@inheritDoc} */
     @Override public byte[] valueBytes() {
         if (valBytes != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 7fd6013..9b6381e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionAware;
@@ -54,6 +55,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.swapspace.SwapKey;
@@ -101,8 +103,16 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     private final ReferenceQueue<Iterator<Map.Entry>> itQ = new ReferenceQueue<>();
 
     /** Soft iterator set. */
-    private final Collection<GridWeakIterator<Map.Entry>> itSet =
-        new GridConcurrentHashSet<>();
+    private final Collection<GridWeakIterator<Map.Entry>> itSet = new GridConcurrentHashSet<>();
+
+    /** {@code True} if offheap to swap eviction is possible. */
+    private boolean offheapToSwapEvicts;
+
+    /** Values to be evicted from offheap to swap. */
+    private ThreadLocal<Collection<IgniteBiTuple<byte[], byte[]>>> offheapEvicts = new ThreadLocal<>();
+
+    /** First offheap eviction warning flag. */
+    private volatile boolean firstEvictWarn;
 
     /**
      * @param enabled Flag to indicate if swap is enabled.
@@ -127,9 +137,58 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     *
+     */
+    public void unwindOffheapEvicts() {
+        if (!offheapToSwapEvicts)
+            return;
+
+        Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get();
+
+        if (evicts != null) {
+            GridCacheVersion obsoleteVer = cctx.versions().next();
+
+            for (IgniteBiTuple<byte[], byte[]> t : evicts) {
+                try {
+                    byte[] kb = t.get1();
+                    byte[] vb = t.get2();
+
+                    GridCacheVersion evictVer = GridCacheSwapEntryImpl.version(vb);
+
+                    KeyCacheObject key = cctx.toCacheKeyObject(kb);
+
+                    while (true) {
+                        GridCacheEntryEx entry = cctx.cache().entryEx(key);
+
+                        try {
+                            if (entry.offheapSwapEvict(vb, evictVer, obsoleteVer))
+                                cctx.cache().removeEntry(entry);
+
+                            break;
+                        }
+                        catch (GridCacheEntryRemovedException ignore) {
+                            // Retry.
+                        }
+                    }
+                }
+                catch (GridDhtInvalidPartitionException e) {
+                    // Skip entry.
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to unmarshal off-heap entry", e);
+                }
+            }
+
+            offheapEvicts.set(null);
+        }
+    }
+
+    /**
      * Initializes off-heap space.
      */
     private void initOffHeap() {
+        assert offheapEnabled;
+
         // Register big data usage.
         long max = cctx.config().getOffHeapMaxMemory();
 
@@ -137,43 +196,69 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int parts = cctx.config().getAffinity().partitions();
 
-        GridOffHeapEvictListener lsnr = !swapEnabled && !offheapEnabled ? null : new GridOffHeapEvictListener() {
-            private volatile boolean firstEvictWarn;
+        GridOffHeapEvictListener lsnr;
 
-            @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
-                try {
-                    if (!firstEvictWarn)
-                        warnFirstEvict();
+        if (swapEnabled) {
+            offheapToSwapEvicts = true;
 
-                    writeToSwap(part, cctx.toCacheKeyObject(kb), vb);
+            lsnr = new GridOffHeapEvictListener() {
+                @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
+                    assert offheapToSwapEvicts;
 
-                    if (cctx.config().isStatisticsEnabled())
-                        cctx.cache().metrics0().onOffHeapEvict();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e);
-                }
-            }
+                    onOffheapEvict();
 
-            private void warnFirstEvict() {
-                synchronized (this) {
-                    if (firstEvictWarn)
-                        return;
+                    Collection<IgniteBiTuple<byte[], byte[]>> evicts = offheapEvicts.get();
 
-                    firstEvictWarn = true;
+                    if (evicts == null)
+                        offheapEvicts.set(evicts = new ArrayList<>());
+
+                    evicts.add(new IgniteBiTuple<>(kb, vb));
                 }
 
-                U.warn(log, "Off-heap evictions started. You may wish to increase 'offHeapMaxMemory' in " +
-                    "cache configuration [cache=" + cctx.name() +
-                    ", offHeapMaxMemory=" + cctx.config().getOffHeapMaxMemory() + ']',
-                    "Off-heap evictions started: " + cctx.name());
-            }
-        };
+                @Override public boolean removeEvicted() {
+                    return false;
+                }
+            };
+        }
+        else {
+            lsnr = new GridOffHeapEvictListener() {
+                @Override public void onEvict(int part, int hash, byte[] kb, byte[] vb) {
+                    onOffheapEvict();
+                }
+
+                @Override public boolean removeEvicted() {
+                    return true;
+                }
+            };
+        }
 
         offheap.create(spaceName, parts, init, max, lsnr);
     }
 
     /**
+     * Warns on first evict from off-heap.
+     */
+    private void onOffheapEvict() {
+        if (cctx.config().isStatisticsEnabled())
+            cctx.cache().metrics0().onOffHeapEvict();
+
+        if (firstEvictWarn)
+            return;
+
+        synchronized (this) {
+            if (firstEvictWarn)
+                return;
+
+            firstEvictWarn = true;
+        }
+
+        U.warn(log, "Off-heap evictions started. You may wish to increase 'offHeapMaxMemory' in " +
+            "cache configuration [cache=" + cctx.name() +
+            ", offHeapMaxMemory=" + cctx.config().getOffHeapMaxMemory() + ']',
+            "Off-heap evictions started: " + cctx.name());
+    }
+
+    /**
      * @return {@code True} if swap store is enabled.
      */
     public boolean swapEnabled() {
@@ -440,17 +525,16 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param key Key to check.
+     * @param part Partition.
      * @return {@code True} if key is contained.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean containsKey(KeyCacheObject key) throws IgniteCheckedException {
+    public boolean containsKey(KeyCacheObject key, int part) throws IgniteCheckedException {
         if (!offheapEnabled && !swapEnabled)
             return false;
 
         checkIteratorQueue();
 
-        int part = cctx.affinity().partition(key);
-
         // First check off-heap store.
         if (offheapEnabled) {
             boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
@@ -480,6 +564,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param key Key to read.
+     * @param keyBytes Key bytes.
      * @param part Key partition.
      * @param entryLocked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
@@ -966,6 +1051,46 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param key Key to move from offheap to swap.
+     * @param entry Serialized swap entry.
+     * @param part Partition.
+     * @param ver Expected entry version.
+     * @return {@code True} if removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    boolean offheapSwapEvict(final KeyCacheObject key, byte[] entry, int part, final GridCacheVersion ver)
+        throws IgniteCheckedException {
+        assert offheapEnabled;
+
+        checkIteratorQueue();
+
+        boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()),
+            new IgniteBiPredicate<Long, Integer>() {
+                @Override public boolean apply(Long ptr, Integer len) {
+                    GridCacheVersion ver0 = GridCacheOffheapSwapEntry.version(ptr);
+
+                    return ver.equals(ver0);
+                }
+            }
+        );
+
+        if (rmv) {
+            Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
+
+            if (lsnrs != null) {
+                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+
+                for (GridCacheSwapListener lsnr : lsnrs)
+                    lsnr.onEntryUnswapped(part, key, e);
+            }
+
+            cctx.swap().writeToSwap(part, key, entry);
+        }
+
+        return rmv;
+    }
+
+    /**
      * @return {@code True} if offheap eviction is enabled.
      */
     boolean offheapEvictionEnabled() {
@@ -976,16 +1101,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * Enables eviction for offheap entry after {@link #readOffheapPointer} was called.
      *
      * @param key Key.
+     * @param part Partition.
      * @throws IgniteCheckedException If failed.
      */
-    void enableOffheapEviction(final KeyCacheObject key) throws IgniteCheckedException {
+    void enableOffheapEviction(final KeyCacheObject key, int part) throws IgniteCheckedException {
         if (!offheapEnabled)
             return;
 
         checkIteratorQueue();
 
-        int part = cctx.affinity().partition(key);
-
         offheap.enableEviction(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 980971c..2d5698a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1029,8 +1029,15 @@ public class GridCacheUtils {
 
         ctx.evicts().unwind();
 
-        if (ctx.isNear())
-            ctx.near().dht().context().evicts().unwind();
+        ctx.swap().unwindOffheapEvicts();
+
+        if (ctx.isNear()) {
+            GridCacheContext dhtCtx = ctx.near().dht().context();
+
+            dhtCtx.evicts().unwind();
+
+            dhtCtx.swap().unwindOffheapEvicts();
+        }
 
         ctx.ttl().expire();
     }
@@ -1041,14 +1048,8 @@ public class GridCacheUtils {
     public static <K, V> void unwindEvicts(GridCacheSharedContext<K, V> ctx) {
         assert ctx != null;
 
-        for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) {
-            cacheCtx.evicts().unwind();
-
-            if (cacheCtx.isNear())
-                cacheCtx.near().dht().context().evicts().unwind();
-
-            cacheCtx.ttl().expire();
-        }
+        for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts())
+            unwindEvicts(cacheCtx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index c5f15cd..956f2bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -262,8 +262,11 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
         map.put(entry.key(), entry);
 
-        if (!entry.isInternal())
+        if (!entry.isInternal()) {
+            assert !entry.deleted() : entry;
+
             mapPubSize.increment();
+        }
     }
 
     /**
@@ -271,7 +274,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
     void onRemoved(GridDhtCacheEntry entry) {
-        assert entry.obsolete();
+        assert entry.obsolete() : entry;
 
         // Make sure to remove exactly this entry.
         synchronized (entry) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5e183e9..fcb012f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -546,10 +546,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param updateSeq Update sequence.
      * @return Local partition.
      */
-    private GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, boolean create, boolean updateSeq) {
-        while (true) {
-            boolean belongs = cctx.affinity().localNode(p, topVer);
+    private GridDhtLocalPartition localPartition(int p,
+        AffinityTopologyVersion topVer,
+        boolean create,
+        boolean updateSeq) {
+        boolean belongs = create && cctx.affinity().localNode(p, topVer);
 
+        while (true) {
             GridDhtLocalPartition loc = locParts.get(p);
 
             if (loc != null && loc.state() == EVICTED) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 3f9decf..2048fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -234,6 +234,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
     /**
      * Completeness callback.
+     *
+     * @return {@code True} if future was finished by this call.
      */
     private boolean onComplete() {
         Throwable err0 = err.get();
@@ -457,6 +459,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
     /**
      * @param reads Read entries.
      * @param writes Write entries.
+     * @throws IgniteCheckedException If failed.
      */
     private void prepare(
         Iterable<IgniteTxEntry> reads,

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 21aaef2..ab6dc3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -404,8 +404,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 if (backup == null) {
                     readyNearMappingFromBackup(mapping);
 
+                    ClusterTopologyCheckedException cause =
+                        new ClusterTopologyCheckedException("Backup node left grid: " + backupId);
+
+                    cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
                     mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
-                        "(backup has left grid): " + tx.xidVersion()));
+                        "(backup has left grid): " + tx.xidVersion(), cause));
                 }
                 else if (backup.isLocal()) {
                     boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion());
@@ -414,9 +419,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                     if (committed)
                         mini.onDone(tx);
-                    else
+                    else {
+                        ClusterTopologyCheckedException cause =
+                            new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
+
+                        cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
                         mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
-                            "(transaction has been rolled back on backup node): " + tx.xidVersion()));
+                            "(transaction has been rolled back on backup node): " + tx.xidVersion(), cause));
+                    }
                 }
                 else {
                     GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
@@ -731,8 +742,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
             readyNearMappingFromBackup(m);
 
-            if (res.checkCommittedError() != null)
-                onDone(res.checkCommittedError());
+            Throwable err = res.checkCommittedError();
+
+            if (err != null) {
+                if (err instanceof IgniteCheckedException) {
+                    ClusterTopologyCheckedException cause =
+                        ((IgniteCheckedException)err).getCause(ClusterTopologyCheckedException.class);
+
+                    if (cause != null)
+                        cause.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+                }
+
+                onDone(err);
+            }
             else
                 onDone(tx);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 9efa43a..756672a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1049,6 +1049,7 @@ public class IgniteTxHandler {
      *
      * @param nodeId Node id that originated finish request.
      * @param req Request.
+     * @param {@code True} if transaction committed on this node.
      */
     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
         if (req.replyRequired()) {
@@ -1057,9 +1058,13 @@ public class IgniteTxHandler {
             if (req.checkCommitted()) {
                 res.checkCommitted(true);
 
-                if (!committed)
+                if (!committed) {
+                    ClusterTopologyCheckedException cause =
+                        new ClusterTopologyCheckedException("Primary node left grid.");
+
                     res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
-                        "(transaction has been rolled back on backup node): " + req.version()));
+                        "(transaction has been rolled back on backup node): " + req.version(), cause));
+                }
             }
 
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index 024ea7c..492fa07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapPartitionedMap;
 import org.apache.ignite.internal.util.typedef.CX2;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.marshaller.Marshaller;
 import org.jetbrains.annotations.Nullable;
@@ -261,13 +262,35 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
      * @return {@code true} If succeeded.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException {
+    public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes)
+        throws IgniteCheckedException {
         GridOffHeapPartitionedMap m = offheap(spaceName);
 
         return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes));
     }
 
     /**
+     * Removes value from offheap space for the given key.
+     *
+     * @param spaceName Space name.
+     * @param part Partition.
+     * @param key Key.
+     * @param keyBytes Key bytes.
+     * @param p Value predicate (arguments are value address and value length).
+     * @return {@code true} If succeeded.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean removex(@Nullable String spaceName,
+        int part,
+        KeyCacheObject key,
+        byte[] keyBytes,
+        IgniteBiPredicate<Long, Integer> p) throws IgniteCheckedException {
+        GridOffHeapPartitionedMap m = offheap(spaceName);
+
+        return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes), p);
+    }
+
+    /**
      * Gets iterator over contents of the given space.
      *
      * @param spaceName Space name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
index 4597be8..beafea4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapEvictListener.java
@@ -30,4 +30,9 @@ public interface GridOffHeapEvictListener {
      * @param valBytes Value bytes.
      */
     public void onEvict(int part, int hash, byte[] keyBytes, byte[] valBytes);
+
+    /**
+     * @return {@code True} if entry selected for eviction should be immediately removed.
+     */
+    public boolean removeEvicted();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
index 1fcddd7..d14a582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMap.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.util.offheap;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.CX2;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Off-heap map.
  */
-public interface GridOffHeapMap<K> {
+public interface GridOffHeapMap {
     /**
      * Gets partition this map belongs to.
      *
@@ -102,6 +103,16 @@ public interface GridOffHeapMap<K> {
     public boolean removex(int hash, byte[] keyBytes);
 
     /**
+     * Removes value from off-heap map without returning it.
+     *
+     * @param hash Hash.
+     * @param keyBytes Key bytes.
+     * @param p Value predicate (arguments are value address and value length).
+     * @return {@code True} if value was removed.
+     */
+    public boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p);
+
+    /**
      * Puts key and value bytes into the map potentially replacing
      * existing entry.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
index 1a3d219..4dd911f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapMapFactory.java
@@ -32,8 +32,8 @@ public class GridOffHeapMapFactory {
      * @param initCap Initial capacity.
      * @return Off-heap map.
      */
-    public static <K> GridOffHeapMap<K> unsafeMap(long initCap) {
-        return new GridUnsafeMap<>(128, 0.75f, initCap, 0, (short)0, null);
+    public static GridOffHeapMap unsafeMap(long initCap) {
+        return new GridUnsafeMap(128, 0.75f, initCap, 0, (short)0, null);
     }
 
     /**
@@ -43,8 +43,8 @@ public class GridOffHeapMapFactory {
      * @param initCap Initial capacity.
      * @return Off-heap map.
      */
-    public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, long initCap) {
-        return new GridUnsafeMap<>(concurrency, 0.75f, initCap, 0, (short)0, null);
+    public static GridOffHeapMap unsafeMap(int concurrency, long initCap) {
+        return new GridUnsafeMap(concurrency, 0.75f, initCap, 0, (short)0, null);
     }
 
     /**
@@ -55,8 +55,8 @@ public class GridOffHeapMapFactory {
      * @param initCap Initial capacity.
      * @return Off-heap map.
      */
-    public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, float load, long initCap) {
-        return new GridUnsafeMap<>(concurrency, load, initCap, 0, (short)0, null);
+    public static GridOffHeapMap unsafeMap(int concurrency, float load, long initCap) {
+        return new GridUnsafeMap(concurrency, load, initCap, 0, (short)0, null);
     }
 
     /**
@@ -68,8 +68,8 @@ public class GridOffHeapMapFactory {
      * @param lruStripes Number of LRU stripes.
      * @return Off-heap map.
      */
-    public static <K> GridOffHeapMap<K> unsafeMap(long initCap, long totalMem, short lruStripes) {
-        return new GridUnsafeMap<>(128, 0.75f, initCap, totalMem, lruStripes, null);
+    public static GridOffHeapMap unsafeMap(long initCap, long totalMem, short lruStripes) {
+        return new GridUnsafeMap(128, 0.75f, initCap, totalMem, lruStripes, null);
     }
 
     /**
@@ -82,9 +82,9 @@ public class GridOffHeapMapFactory {
      * @param lsnr Optional eviction listener which gets notified every time an entry is evicted.
      * @return Off-heap map.
      */
-    public static <K> GridOffHeapMap<K> unsafeMap(long initCap, long totalMem, short lruStripes,
+    public static GridOffHeapMap unsafeMap(long initCap, long totalMem, short lruStripes,
         @Nullable GridOffHeapEvictListener lsnr) {
-        return new GridUnsafeMap<>(128, 0.75f, initCap, totalMem, lruStripes, lsnr);
+        return new GridUnsafeMap(128, 0.75f, initCap, totalMem, lruStripes, lsnr);
     }
 
     /**
@@ -98,9 +98,9 @@ public class GridOffHeapMapFactory {
      * @param lsnr Optional eviction listener which gets notified every time an entry is evicted.
      * @return Off-heap map.
      */
-    public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, long initCap, long totalMem, short lruStripes,
+    public static GridOffHeapMap unsafeMap(int concurrency, long initCap, long totalMem, short lruStripes,
         @Nullable GridOffHeapEvictListener lsnr) {
-        return new GridUnsafeMap<>(concurrency, 0.75f, initCap, totalMem, lruStripes, lsnr);
+        return new GridUnsafeMap(concurrency, 0.75f, initCap, totalMem, lruStripes, lsnr);
     }
 
     /**
@@ -115,9 +115,9 @@ public class GridOffHeapMapFactory {
      * @param lsnr Optional eviction listener which gets notified every time an entry is evicted.
      * @return Off-heap map.
      */
-    public static <K> GridOffHeapMap<K> unsafeMap(int concurrency, float load, long initCap, long totalMem,
+    public static <K> GridOffHeapMap unsafeMap(int concurrency, float load, long initCap, long totalMem,
         short lruStripes, @Nullable GridOffHeapEvictListener lsnr) {
-        return new GridUnsafeMap<>(concurrency, load, initCap, totalMem, lruStripes, lsnr);
+        return new GridUnsafeMap(concurrency, load, initCap, totalMem, lruStripes, lsnr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
index 3afdfa9..c1e1bfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
@@ -21,6 +21,7 @@ import java.util.Set;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.CX2;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 
@@ -111,6 +112,17 @@ public interface GridOffHeapPartitionedMap {
     public boolean removex(int p, int hash, byte[] keyBytes);
 
     /**
+     * Removes value from off-heap map without returning it.
+     *
+     * @param part Partition.
+     * @param hash Hash.
+     * @param keyBytes Key bytes.
+     * @param p Value predicate (arguments are value address and value length).
+     * @return {@code True} if value was removed.
+     */
+    public boolean removex(int part, int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p);
+
+    /**
      * Puts key and value bytes into the map potentially replacing
      * existing entry.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
index 40fb3e8..359d36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafeMap.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.CX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
@@ -42,7 +43,7 @@ import static org.apache.ignite.internal.util.offheap.GridOffHeapEvent.REHASH;
 /**
  * Off-heap map based on {@code Unsafe} implementation.
  */
-public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
+public class GridUnsafeMap implements GridOffHeapMap {
     /** Header size. */
     private static final int HEADER = 4 /*hash*/ + 4 /*key-size*/  + 4 /*value-size*/ + 8 /*queue-address*/ +
         8 /*next-address*/;
@@ -77,7 +78,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
     private final float load;
 
     /** Segments. */
-    private final Segment<K>[] segs;
+    private final Segment[] segs;
 
     /** Total memory. */
     private final GridUnsafeMemory mem;
@@ -111,6 +112,9 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
     /** LRU poller. */
     private final GridUnsafeLruPoller lruPoller;
 
+    /** */
+    private final boolean rmvEvicted;
+
     /**
      * @param concurrency Concurrency.
      * @param load Load factor.
@@ -180,6 +184,8 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
                 }
             }
         };
+
+        rmvEvicted = evictLsnr == null || evictLsnr.removeEvicted();
     }
 
     /**
@@ -225,6 +231,8 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
         segs = new Segment[size];
 
         init(initCap, size);
+
+        rmvEvicted = evictLsnr == null || evictLsnr.removeEvicted();
     }
 
     /**
@@ -247,7 +255,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
 
         for (int i = 0; i < size; i++) {
             try {
-                segs[i] = new Segment<>(i, cap);
+                segs[i] = new Segment(i, cap);
             }
             catch (GridOffHeapOutOfMemoryException e) {
                 destruct();
@@ -327,6 +335,11 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p) {
+        return segmentFor(hash).removex(hash, keyBytes, p);
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean put(int hash, byte[] keyBytes, byte[] valBytes) {
         return segmentFor(hash).put(hash, keyBytes, valBytes);
     }
@@ -559,7 +572,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
     /**
      * Segment.
      */
-    private class Segment<K> {
+    private class Segment {
         /** Lock. */
         private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -1009,41 +1022,51 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
                         }
 
                         if (cur != 0) {
-                            long next = Entry.nextAddress(cur, mem);
+                            long qAddr0 = Entry.queueAddress(cur, mem);
 
-                            if (prev != 0)
-                                Entry.nextAddress(prev, next, mem); // Relink.
-                            else {
-                                if (next == 0)
-                                    Bin.clear(binAddr, mem);
-                                else
-                                    Bin.first(binAddr, next, mem);
-                            }
+                            assert qAddr == qAddr0 : "Queue node address mismatch " +
+                                "[qAddr=" + qAddr + ", entryQueueAddr=" + qAddr + ']';
 
                             if (evictLsnr != null) {
                                 keyBytes = Entry.readKeyBytes(cur, mem);
 
-                                // TODO: GG-8123: Inlined as a workaround. Revert when 7u60 is released.
-//                                valBytes = Entry.readValueBytes(cur, mem);
-                                {
-                                    int keyLen = Entry.readKeyLength(cur, mem);
-                                    int valLen = Entry.readValueLength(cur, mem);
+                                int keyLen = Entry.readKeyLength(cur, mem);
+                                int valLen = Entry.readValueLength(cur, mem);
 
-                                    valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
-                                }
+                                valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
                             }
 
-                            long a;
+                            if (rmvEvicted) {
+                                long a;
 
-                            assert qAddr == (a = Entry.queueAddress(cur, mem)) : "Queue node address mismatch " +
-                                "[qAddr=" + qAddr + ", entryQueueAddr=" + a + ']';
+                                assert qAddr == (a = Entry.queueAddress(cur, mem)) : "Queue node address mismatch " +
+                                    "[qAddr=" + qAddr + ", entryQueueAddr=" + a + ']';
 
-                            relSize = Entry.size(cur, mem);
-                            relAddr = cur;
+                                long next = Entry.nextAddress(cur, mem);
 
-                            cnt--;
+                                if (prev != 0)
+                                    Entry.nextAddress(prev, next, mem); // Relink.
+                                else {
+                                    if (next == 0)
+                                        Bin.clear(binAddr, mem);
+                                    else
+                                        Bin.first(binAddr, next, mem);
+                                }
 
-                            totalCnt.decrement();
+                                relSize = Entry.size(cur, mem);
+                                relAddr = cur;
+
+                                cnt--;
+
+                                totalCnt.decrement();
+                            }
+                            else {
+                                boolean clear = Entry.clearQueueAddress(cur, qAddr, mem);
+
+                                assert clear;
+
+                                relSize = Entry.size(cur, mem);
+                            }
                         }
                     }
                 }
@@ -1251,7 +1274,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
          */
         @SuppressWarnings("TooBroadScope")
         byte[] remove(int hash, byte[] keyBytes) {
-            return remove(hash, keyBytes, true);
+            return remove(hash, keyBytes, true, null);
         }
 
         /**
@@ -1260,17 +1283,28 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
          * @return {@code True} if value was removed.
          */
         boolean removex(int hash, byte[] keyBytes) {
-            return remove(hash, keyBytes, false) == EMPTY_BYTES;
+            return remove(hash, keyBytes, false, null) == EMPTY_BYTES;
+        }
+
+        /**
+         * @param hash Hash.
+         * @param keyBytes Key bytes.
+         * @param p Value predicate.
+         * @return {@code True} if value was removed.
+         */
+        boolean removex(int hash, byte[] keyBytes, IgniteBiPredicate<Long, Integer> p) {
+            return remove(hash, keyBytes, false, p) == EMPTY_BYTES;
         }
 
         /**
          * @param hash Hash.
          * @param keyBytes Key bytes.
          * @param retval {@code True} if need removed value.
+         * @param p Value predicate.
          * @return Removed value bytes.
          */
         @SuppressWarnings("TooBroadScope")
-        byte[] remove(int hash, byte[] keyBytes, boolean retval) {
+        byte[] remove(int hash, byte[] keyBytes, boolean retval, @Nullable IgniteBiPredicate<Long, Integer> p) {
             int relSize = 0;
             long relAddr = 0;
             long qAddr = 0;
@@ -1291,6 +1325,19 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
 
                         // If found match.
                         if (Entry.keyEquals(cur, keyBytes, mem)) {
+                            int keyLen = 0;
+                            int valLen = 0;
+
+                            if (p != null) {
+                                keyLen = Entry.readKeyLength(cur, mem);
+                                valLen = Entry.readValueLength(cur, mem);
+
+                                long valPtr = cur + HEADER + keyLen;
+
+                                if (!p.apply(valPtr, valLen))
+                                    return null;
+                            }
+
                             if (prev != 0)
                                 Entry.nextAddress(prev, next, mem); // Relink.
                             else {
@@ -1300,18 +1347,16 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
                                     Bin.first(binAddr, next, mem);
                             }
 
-                            // TODO: GG-8123: Inlined as a workaround. Revert when 7u60 is released.
-//                            valBytes = retval ? Entry.readValueBytes(cur, mem) : EMPTY_BYTES;
-                            {
-                                if (retval) {
-                                    int keyLen = Entry.readKeyLength(cur, mem);
-                                    int valLen = Entry.readValueLength(cur, mem);
-
-                                    valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
+                            if (retval) {
+                                if (keyLen == 0) {
+                                    keyLen = Entry.readKeyLength(cur, mem);
+                                    valLen = Entry.readValueLength(cur, mem);
                                 }
-                                else
-                                    valBytes = EMPTY_BYTES;
+
+                                valBytes = mem.readBytes(cur + HEADER + keyLen, valLen);
                             }
+                            else
+                                valBytes = EMPTY_BYTES;
 
                             // Prepare release of memory.
                             qAddr = Entry.queueAddress(cur, mem);
@@ -1382,8 +1427,7 @@ public class GridUnsafeMap<K> implements GridOffHeapMap<K> {
          * @param keyBytes Key bytes.
          * @return Value pointer.
          */
-        @Nullable
-        IgniteBiTuple<Long, Integer> valuePointer(int hash, byte[] keyBytes) {
+        @Nullable IgniteBiTuple<Long, Integer> valuePointer(int hash, byte[] keyBytes) {
             long binAddr = readLock(hash);
 
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
index 070da51..fb8ac14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.offheap.GridOffHeapMap;
 import org.apache.ignite.internal.util.offheap.GridOffHeapPartitionedMap;
 import org.apache.ignite.internal.util.typedef.CX2;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
@@ -198,6 +199,14 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean removex(int part,
+        int hash,
+        byte[] keyBytes,
+        IgniteBiPredicate<Long, Integer> p) {
+        return mapFor(part).removex(hash, keyBytes, p);
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean put(int p, int hash, byte[] keyBytes, byte[] valBytes) {
         return mapFor(p).put(hash, keyBytes, valBytes);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
index 271d8b1..214beb6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -53,6 +54,12 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
     /** */
     private static final long DURATION = 30_000;
 
+    /** */
+    private static final long OFFHEAP_MEM = 1000;
+
+    /** */
+    private static final int MAX_HEAP_SIZE = 100;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -81,7 +88,7 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
 
         if (memMode == CacheMemoryMode.ONHEAP_TIERED) {
             LruEvictionPolicy plc = new LruEvictionPolicy();
-            plc.setMaxSize(100);
+            plc.setMaxSize(MAX_HEAP_SIZE);
 
             ccfg.setEvictionPolicy(plc);
         }
@@ -89,7 +96,7 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
         if (swap) {
             ccfg.setSwapEnabled(true);
 
-            ccfg.setOffHeapMaxMemory(1000);
+            ccfg.setOffHeapMaxMemory(OFFHEAP_MEM);
         }
         else
             ccfg.setOffHeapMaxMemory(0);
@@ -133,6 +140,20 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testTxCacheOffheapSwapEvict() throws Exception {
+        swapUnswap(TRANSACTIONAL, CacheMemoryMode.ONHEAP_TIERED, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxCacheOffheapTieredSwapEvict() throws Exception {
+        swapUnswap(TRANSACTIONAL, CacheMemoryMode.OFFHEAP_TIERED, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicCacheOffheapEvict() throws Exception {
         swapUnswap(ATOMIC, CacheMemoryMode.ONHEAP_TIERED, false);
     }
@@ -145,6 +166,20 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicCacheOffheapSwapEvict() throws Exception {
+        swapUnswap(ATOMIC, CacheMemoryMode.ONHEAP_TIERED, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicCacheOffheapTieredSwapEvict() throws Exception {
+        swapUnswap(ATOMIC, CacheMemoryMode.OFFHEAP_TIERED, true);
+    }
+
+    /**
      * @param atomicityMode Cache atomicity mode.
      * @param memMode Cache memory mode.
      * @param swap {@code True} if swap enabled.
@@ -220,12 +255,56 @@ public class CacheSwapUnswapGetTest extends GridCommonAbstractTest {
                 }
             });
 
-            Thread.sleep(DURATION);
+            long endTime = System.currentTimeMillis() + DURATION;
+
+            while (System.currentTimeMillis() < endTime) {
+                Thread.sleep(5000);
+
+                log.info("Cache size [heap=" + cache.localSize(CachePeekMode.ONHEAP) +
+                    ", offheap=" + cache.localSize(CachePeekMode.OFFHEAP) +
+                    ", swap=" + cache.localSize(CachePeekMode.SWAP) +
+                    ", total=" + cache.localSize() +
+                    ", offheapMem=" + cache.metrics().getOffHeapAllocatedSize() + ']');
+            }
 
             done.set(true);
 
             fut.get();
             getFut.get();
+
+            for (Integer key : keys) {
+                String val = cache.get(key);
+
+                assertNotNull(val);
+            }
+
+            int onheapSize = cache.localSize(CachePeekMode.ONHEAP);
+            int offheapSize = cache.localSize(CachePeekMode.OFFHEAP);
+            int swapSize = cache.localSize(CachePeekMode.SWAP);
+            int total = cache.localSize();
+            long offheapMem = cache.metrics().getOffHeapAllocatedSize();
+
+            log.info("Cache size [heap=" + onheapSize +
+                ", offheap=" + offheapSize +
+                ", swap=" + swapSize +
+                ", total=" + total +
+                ", offheapMem=" + offheapMem +  ']');
+
+            assertTrue(total > 0);
+
+            assertEquals(onheapSize + offheapSize + swapSize, total);
+
+            if (memMode == CacheMemoryMode.OFFHEAP_TIERED)
+                assertEquals(0, onheapSize);
+            else
+                assertEquals(MAX_HEAP_SIZE, onheapSize);
+
+            if (swap) {
+                assertTrue(swapSize > 0);
+                assertTrue(offheapMem <= OFFHEAP_MEM);
+            }
+            else
+                assertEquals(0, swapSize);
         }
         finally {
             done.set(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 3e646d3..2a64963 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -2825,7 +2825,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             try {
                 cache.clear();
 
-                assertEquals(vals.get(first), cache.localPeek(first, ONHEAP));
+                GridCacheContext<String, Integer> cctx = context(0);
+
+                GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(first) :
+                    cctx.cache().peekEx(first);
+
+                assertNotNull(entry);
             }
             finally {
                 lock.unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
index 31488e0..647746e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
@@ -21,38 +21,50 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  * Tests that removes are not lost when topology changes.
  */
-public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstractSelfTest {
+public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
     /** */
     private static final int GRID_CNT = 3;
 
     /** Keys count. */
-    private static final int KEYS_CNT = 10000;
+    private static final int KEYS_CNT = 10_000;
 
     /** Test duration. */
     private static final long DUR = 90 * 1000L;
@@ -66,36 +78,21 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
     /** Start delay. */
     private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, 5000);
 
-    /** Node kill lock (used to prevent killing while cache data is compared). */
-    private final Lock killLock = new ReentrantLock();
-
     /** */
-    private CountDownLatch assertLatch;
-
-    /** */
-    private CountDownLatch updateLatch;
-
-    /** Caches comparison request flag. */
-    private volatile boolean cmp;
-
-    /** */
-    private String sizePropVal;
+    private static String sizePropVal;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER).setForceServerMode(true);
 
         if (testClientNode() && getTestGridName(0).equals(gridName))
             cfg.setClientMode(true);
 
-        return cfg;
-    }
+        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
 
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return GRID_CNT;
+        return cfg;
     }
 
     /** {@inheritDoc} */
@@ -104,6 +101,8 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
         sizePropVal = System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE);
 
         System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000");
+
+        startGrids(GRID_CNT);
     }
 
     /** {@inheritDoc} */
@@ -111,15 +110,7 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
         super.afterTestsStopped();
 
         System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, sizePropVal != null ? sizePropVal : "");
-    }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        startGrids(gridCount());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
         stopAllGrids();
     }
 
@@ -129,6 +120,28 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
     }
 
     /**
+     * @return Cache mode.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * @return Cache atomicity mode.
+     */
+    protected abstract CacheAtomicityMode atomicityMode();
+
+    /**
+     * @return Near cache configuration.
+     */
+    protected abstract NearCacheConfiguration nearCache();
+
+    /**
+     * @return Atomic cache write order mode.
+     */
+    protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return null;
+    }
+
+    /**
      * @return {@code True} if test updates from client node.
      */
     protected boolean testClientNode() {
@@ -139,9 +152,49 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
      * @throws Exception If failed.
      */
     public void testPutAndRemove() throws Exception {
-        assertEquals(testClientNode(), (boolean)grid(0).configuration().isClientMode());
+        putAndRemove(DUR, GridTestUtils.TestMemoryMode.HEAP);
+    }
 
-        final IgniteCache<Integer, Integer> sndCache0 = grid(0).cache(null);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAndRemoveOffheapEvict() throws Exception {
+        putAndRemove(30_000, GridTestUtils.TestMemoryMode.OFFHEAP_EVICT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAndRemoveOffheapEvictSwap() throws Exception {
+        putAndRemove(30_000, GridTestUtils.TestMemoryMode.OFFHEAP_EVICT_SWAP);
+    }
+
+    /**
+     * @param duration Test duration.
+     * @param memMode Memory mode.
+     * @throws Exception If failed.
+     */
+    private void putAndRemove(long duration, GridTestUtils.TestMemoryMode memMode) throws Exception {
+        assertEquals(testClientNode(), (boolean) grid(0).configuration().isClientMode());
+
+        grid(0).destroyCache(null);
+
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        ccfg.setCacheMode(cacheMode());
+
+        if (cacheMode() == PARTITIONED)
+            ccfg.setBackups(1);
+
+        ccfg.setAtomicityMode(atomicityMode());
+        ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode());
+        ccfg.setNearConfiguration(nearCache());
+
+        GridTestUtils.setMemoryMode(null, ccfg, memMode, 100, 1024);
+
+        final IgniteCache<Integer, Integer> sndCache0 = grid(0).createCache(ccfg);
 
         final AtomicBoolean stop = new AtomicBoolean();
 
@@ -152,8 +205,12 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
         // Expected values in cache.
         final Map<Integer, GridTuple<Integer>> expVals = new ConcurrentHashMap8<>();
 
+        final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>();
+
         IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
+                Thread.currentThread().setName("update-thread");
+
                 ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                 while (!stop.get()) {
@@ -190,10 +247,14 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
 
                     cntr.addAndGet(100);
 
-                    if (cmp) {
-                        assertLatch.countDown();
+                    CyclicBarrier barrier = cmp.get();
+
+                    if (barrier != null) {
+                        log.info("Wait data check.");
 
-                        updateLatch.await();
+                        barrier.await(60_000, TimeUnit.MILLISECONDS);
+
+                        log.info("Finished wait data check.");
                     }
                 }
 
@@ -203,16 +264,21 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
 
         IgniteInternalFuture<?> killFut = GridTestUtils.runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
+                Thread.currentThread().setName("restart-thread");
+
                 while (!stop.get()) {
                     U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2()));
 
-                    killLock.lock();
+                    killAndRestart(stop);
 
-                    try {
-                        killAndRestart(stop);
-                    }
-                    finally {
-                        killLock.unlock();
+                    CyclicBarrier barrier = cmp.get();
+
+                    if (barrier != null) {
+                        log.info("Wait data check.");
+
+                        barrier.await(60_000, TimeUnit.MILLISECONDS);
+
+                        log.info("Finished wait data check.");
                     }
                 }
 
@@ -221,7 +287,7 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
         });
 
         try {
-            long stopTime = DUR + U.currentTimeMillis() ;
+            long stopTime = duration + U.currentTimeMillis() ;
 
             long nextAssert = U.currentTimeMillis() + ASSERT_FREQ;
 
@@ -241,31 +307,34 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
                 log.info("Operations/second: " + opsPerSecond);
 
                 if (U.currentTimeMillis() >= nextAssert) {
-                    updateLatch = new CountDownLatch(1);
+                    CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
+                        @Override public void run() {
+                            try {
+                                cmp.set(null);
 
-                    assertLatch = new CountDownLatch(1);
+                                log.info("Checking cache content.");
 
-                    cmp = true;
+                                assertCacheContent(expVals);
 
-                    killLock.lock();
+                                log.info("Finished check cache content.");
+                            }
+                            catch (Throwable e) {
+                                log.error("Unexpected error: " + e, e);
 
-                    try {
-                        if (!assertLatch.await(60_000, TimeUnit.MILLISECONDS))
-                            throw new IgniteCheckedException("Failed to suspend thread executing updates.");
+                                throw e;
+                            }
+                        }
+                    });
 
-                        log.info("Checking cache content.");
+                    log.info("Start cache content check.");
 
-                        assertCacheContent(expVals);
+                    cmp.set(barrier);
 
-                        nextAssert = System.currentTimeMillis() + ASSERT_FREQ;
-                    }
-                    finally {
-                        killLock.unlock();
+                    barrier.await(60_000, TimeUnit.MILLISECONDS);
 
-                        updateLatch.countDown();
+                    log.info("Cache content check done.");
 
-                        U.sleep(500);
-                    }
+                    nextAssert = System.currentTimeMillis() + ASSERT_FREQ;
                 }
             }
         }
@@ -278,18 +347,17 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
         updateFut.get();
 
         log.info("Test finished. Update errors: " + errCntr.get());
-
     }
 
     /**
      * @param stop Stop flag.
      * @throws Exception If failed.
      */
-    void killAndRestart(AtomicBoolean stop) throws Exception {
+    private void killAndRestart(AtomicBoolean stop) throws Exception {
         if (stop.get())
             return;
 
-        int idx = random(1, gridCount() + 1);
+        int idx = random(1, GRID_CNT + 1);
 
         log.info("Killing node " + idx);
 
@@ -309,10 +377,9 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
 
     /**
      * @param expVals Expected values in cache.
-     * @throws Exception If failed.
      */
     @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"})
-    private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) throws Exception {
+    private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) {
         assert !expVals.isEmpty();
 
         Collection<Integer> failedKeys = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a7490a6e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
index c5c1c39..2f6ee8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java
@@ -232,9 +232,15 @@ public class GridCacheMemoryModeSelfTest extends GridCommonAbstractTest {
         //putAll
         doTest(cache, offheapSwap, offheapEmpty, swapEmpty, new CIX1<IgniteCache<String, Integer>>() {
             @Override public void applyx(IgniteCache<String, Integer> c) throws IgniteCheckedException {
+                putAll(c, 0, all / 2);
+
+                putAll(c, all / 2 + 1, all - 1);
+            }
+
+            private void putAll(IgniteCache<String, Integer> c, int k1, int k2) {
                 Map<String, Integer> m = new HashMap<>();
 
-                for (int i = 0; i < all; i++)
+                for (int i = k1; i <= k2; i++)
                     m.put(valueOf(i), i);
 
                 c.putAll(m);
@@ -264,6 +270,7 @@ public class GridCacheMemoryModeSelfTest extends GridCommonAbstractTest {
         assertEquals(offheapSwap, c.localSize(CachePeekMode.OFFHEAP) + c.localSize(CachePeekMode.SWAP));
 
         info("size: " + c.size());
+        info("heap: " + c.localSize(CachePeekMode.ONHEAP));
         info("offheap: " + c.localSize(CachePeekMode.OFFHEAP));
         info("swap: " + c.localSize(CachePeekMode.SWAP));
 


Mime
View raw message