ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/17] ignite git commit: ignite-2620 Fixed entry expiration handling (cherry picked from commit 46892c7)
Date Thu, 26 May 2016 06:30:35 GMT
ignite-2620 Fixed entry expiration handling
(cherry picked from commit 46892c7)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69a0afa7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69a0afa7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69a0afa7

Branch: refs/heads/ignite-3116
Commit: 69a0afa79d2bb175908f7aaf53a1f45915477e99
Parents: e47440d
Author: sboikov <sboikov@gridgain.com>
Authored: Tue May 24 00:07:05 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue May 24 00:22:06 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   5 +-
 .../processors/cache/GridCacheMapEntry.java     | 393 +++++++++--------
 .../processors/cache/GridCacheSwapManager.java  |  11 +-
 .../processors/cache/GridCacheTtlManager.java   |  21 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   2 +-
 .../continuous/CacheContinuousQueryManager.java |   5 +-
 ...niteCacheExpireAndUpdateConsistencyTest.java | 437 +++++++++++++++++++
 .../IgniteCacheExpiryPolicyAbstractTest.java    |   3 +-
 .../IgniteCacheExpiryPolicyTestSuite.java       |   3 +
 .../CacheOperationsWithExpirationTest.java      | 355 +++++++++++++++
 .../cache/GridCacheOffheapIndexGetSelfTest.java |  48 ++
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 12 files changed, 1088 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 73a9dbf..31bd887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -912,9 +912,10 @@ public interface GridCacheEntryEx {
      * Callback from ttl processor to cache entry indicating that entry is expired.
      *
      * @param obsoleteVer Version to set obsolete if entry is expired.
-     * @return {@code True} if this entry was obsolete or became obsolete as a result of this call.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     * @return {@code True} if this entry was expired as a result of this call.
      */
-    public boolean onTtlExpired(GridCacheVersion obsoleteVer);
+    public boolean onTtlExpired(GridCacheVersion obsoleteVer) throws GridCacheEntryRemovedException;
 
     /**
      * @return Time to live, without accounting for transactions or removals.

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 72e2ddd..480403e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -479,6 +479,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         return unswap(true);
     }
 
+    /** {@inheritDoc} */
+    @Nullable @Override public CacheObject unswap(boolean needVal)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
+        return unswap(needVal, true);
+    }
+
     /**
      * Unswaps an entry.
      *
@@ -486,13 +492,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @return Value.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable @Override public CacheObject unswap(boolean needVal)
+    @Nullable private CacheObject unswap(boolean needVal, boolean checkExpire)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        boolean swapEnabled = cctx.swap().swapEnabled();
-
-        if (!swapEnabled && !cctx.isOffHeapEnabled())
+        if (!cctx.isSwapOrOffheapEnabled())
             return null;
 
+        boolean obsolete = false;
+        boolean deferred = false;
+        GridCacheVersion ver0 = null;
+
         synchronized (this) {
             checkObsolete();
 
@@ -509,7 +517,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             flags |= IS_OFFHEAP_PTR_MASK;
 
                             if (needVal) {
-                                CacheObject val = cctx.fromOffheap(offHeapPointer(), false);
+                                CacheObject val = cctx.fromOffheap(e.offheapPointer(), false);
 
                                 e.value(val);
                             }
@@ -528,28 +536,49 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 // If there is a value.
                 if (e != null) {
-                    long delta = e.expireTime() == 0 ? 0 : e.expireTime() - U.currentTimeMillis();
+                    long delta = checkExpire ? (e.expireTime() == 0 ? 0 : e.expireTime() - U.currentTimeMillis()) : 0;
 
-                    if (delta >= 0) {
-                        CacheObject val = e.value();
+                    CacheObject val = e.value();
 
-                        val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+                    val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
-                        // Set unswapped value.
-                        update(val, e.expireTime(), e.ttl(), e.version(), false);
+                    // Set unswapped value.
+                    update(val, e.expireTime(), e.ttl(), e.version(), false);
 
-                        // Must update valPtr again since update() will reset it.
-                        if (cctx.offheapTiered() && e.offheapPointer() > 0)
-                            offHeapPointer(e.offheapPointer());
+                    // Must update valPtr again since update() will reset it.
+                    if (cctx.offheapTiered() && e.offheapPointer() > 0)
+                        offHeapPointer(e.offheapPointer());
 
+                    if (delta >= 0)
                         return val;
+                    else {
+                        CacheObject expiredVal = rawGetOrUnmarshal(false);
+
+                        if (onExpired(expiredVal, null)) {
+                            if (cctx.deferredDelete()) {
+                                deferred = true;
+                                ver0 = ver;
+                            }
+                            else
+                                obsolete = true;
+                        }
                     }
-                    else
-                        clearIndex(e.value());
                 }
             }
         }
 
+        if (obsolete) {
+            onMarkedObsolete();
+
+            cctx.cache().removeEntry(this);
+        }
+
+        if (deferred) {
+            assert ver0 != null;
+
+            cctx.onDeferredDelete(this, ver0);
+        }
+
         return null;
     }
 
@@ -562,20 +591,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         if (cctx.isSwapOrOffheapEnabled() && !deletedUnlocked() && (hasValueUnlocked() || swapNeeded) && !detached()) {
             assert Thread.holdsLock(this);
 
-            long expireTime = expireTimeExtras();
+            boolean offheapPtr = hasOffHeapPointer();
 
-            if (expireTime > 0 && U.currentTimeMillis() >= expireTime) { // Don't swap entry if it's expired.
-                // Entry might have been updated.
-                if (cctx.offheapTiered()) {
-                    cctx.swap().removeOffheap(key);
-
-                    offHeapPointer(0);
-                }
-
-                return;
-            }
-
-            if (cctx.offheapTiered() && hasOffHeapPointer() && !swapNeeded) {
+            if (cctx.offheapTiered() && offheapPtr && !swapNeeded) {
                 if (log.isDebugEnabled())
                     log.debug("Value did not change, skip write swap entry: " + this);
 
@@ -605,9 +623,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 valBytes.get2(),
                 ver,
                 ttlExtras(),
-                expireTime,
+                expireTimeExtras(),
                 keyClsLdrId,
-                valClsLdrId);
+                valClsLdrId,
+                !offheapPtr);
 
             flags &= ~IS_SWAPPING_REQUIRED;
 
@@ -646,9 +665,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      */
     protected final void releaseSwap() throws IgniteCheckedException {
         if (cctx.isSwapOrOffheapEnabled()) {
-            synchronized (this) {
-                cctx.swap().remove(key());
-            }
+            assert Thread.holdsLock(this);
+
+            cctx.swap().remove(key(), partition());
 
             if (log.isDebugEnabled())
                 log.debug("Removed swap entry [entry=" + this + ']');
@@ -758,132 +777,74 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         GridCacheMvccCandidate owner;
 
-        CacheObject old;
-        CacheObject ret = null;
+        CacheObject ret;
 
         GridCacheVersion startVer;
         GridCacheVersion resVer = null;
 
-        boolean expired = false;
-
-        CacheObject expiredVal = null;
-
-        boolean hasOldBytes;
+        boolean obsolete = false;
+        boolean deferred = false;
+        GridCacheVersion ver0 = null;
 
         synchronized (this) {
             checkObsolete();
 
-            // Cache version for optimistic check.
-            startVer = ver;
-
             GridCacheMvcc mvcc = mvccExtras();
 
             owner = mvcc == null ? null : mvcc.anyOwner();
 
-            double delta;
-
-            long expireTime = expireTimeExtras();
-
-            if (expireTime > 0) {
-                delta = expireTime - U.currentTimeMillis();
-
-                if (log.isDebugEnabled())
-                    log.debug("Checked expiration time for entry [timeLeft=" + delta + ", entry=" + this + ']');
-
-                if (delta <= 0)
-                    expired = true;
-            }
-
-            CacheObject val = this.val;
+            boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
 
-            hasOldBytes = hasOffHeapPointer();
+            CacheObject val;
 
-            if ((unmarshal || isOffHeapValuesOnly()) && !expired && val == null && hasOldBytes)
+            if (valid) {
                 val = rawGetOrUnmarshalUnlocked(tmp);
 
-            boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
-
-            // Attempt to load from swap.
-            if (val == null && !hasOldBytes && readSwap) {
-                // Only promote when loading initial state.
-                if (isNew() || !valid) {
-                    // If this entry is already expired (expiration time was too low),
-                    // we simply remove from swap and clear index.
-                    if (expired) {
-                        releaseSwap();
-
-                        // Previous value is guaranteed to be null
-                        clearIndex(null);
-                    }
-                    else {
-                        // Read and remove swap entry.
+                if (val == null && readSwap) {
+                    if (isStartVersion()) {
                         if (tmp) {
-                            unswap(false);
+                            unswap(false, false);
 
                             val = rawGetOrUnmarshalUnlocked(true);
                         }
                         else
-                            val = unswap();
+                            val = unswap(true, false);
+                    }
+                }
 
-                        // Recalculate expiration after swap read.
-                        if (expireTime > 0) {
-                            delta = expireTime - U.currentTimeMillis();
+                if (val != null) {
+                    long expireTime = expireTimeExtras();
 
-                            if (log.isDebugEnabled())
-                                log.debug("Checked expiration time for entry [timeLeft=" + delta +
-                                    ", entry=" + this + ']');
+                    if (expireTime > 0 && (expireTime - U.currentTimeMillis() <= 0)) {
+                        if (onExpired((CacheObject)cctx.unwrapTemporary(val), null)) {
+                            val = null;
+                            evt = false;
 
-                            if (delta <= 0)
-                                expired = true;
+                            if (cctx.deferredDelete()) {
+                                deferred = true;
+                                ver0 = ver;
+                            }
+                            else
+                                obsolete = true;
                         }
                     }
                 }
             }
+            else
+                val = null;
 
-            old = expired || !valid ? null : val;
-
-            if (expired) {
-                expiredVal = val;
+            ret = val;
 
-                value(null);
-            }
-
-            if (old == null && !hasOldBytes) {
+            if (ret == null) {
                 if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled())
                     cctx.cache().metrics0().onRead(false);
             }
             else {
                 if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled())
                     cctx.cache().metrics0().onRead(true);
-
-                // Set retVal here for event notification.
-                ret = old;
-            }
-
-            if (evt && expired) {
-                if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
-                    cctx.events().addEvent(partition(),
-                        key,
-                        tx,
-                        owner,
-                        EVT_CACHE_OBJECT_EXPIRED,
-                        null,
-                        false,
-                        expiredVal,
-                        expiredVal != null || hasOldBytes,
-                        subjId,
-                        null,
-                        taskName,
-                        keepBinary);
-                }
-
-                cctx.continuousQueries().onEntryExpired(this, key, expiredVal);
-
-                // No more notifications.
-                evt = false;
             }
 
-            if (evt && !expired && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+            if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
                 cctx.events().addEvent(
                     partition(),
                     key,
@@ -892,8 +853,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     EVT_CACHE_OBJECT_READ,
                     ret,
                     ret != null,
-                    old,
-                    hasOldBytes || old != null,
+                    ret,
+                    ret != null,
                     subjId,
                     transformClo != null ? transformClo.getClass().getName() : null,
                     taskName,
@@ -912,18 +873,30 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 if (resVer == null)
                     ret = null;
             }
+
+            // Cache version for optimistic check.
+            startVer = ver;
         }
 
         if (ret != null) {
             assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
+            assert !obsolete;
+            assert !deferred;
 
             // If return value is consistent, then done.
             return retVer ? new T2<>(ret, resVer) : ret;
         }
 
-        boolean loadedFromStore = false;
+        if (obsolete) {
+            onMarkedObsolete();
+
+            throw new GridCacheEntryRemovedException();
+        }
+
+        if (deferred)
+            cctx.onDeferredDelete(this, ver0);
 
-        if (ret == null && readThrough) {
+        if (readThrough) {
             IgniteInternalTx tx0 = null;
 
             if (tx != null && tx.local()) {
@@ -939,10 +912,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             Object storeVal = readThrough(tx0, key, false, subjId, taskName);
 
             ret = cctx.toCacheObject(storeVal);
-
-            loadedFromStore = true;
         }
 
+        if (ret == null && !evt)
+            return null;
+
         synchronized (this) {
             long ttl = ttlExtras();
 
@@ -958,9 +932,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                     long expTime = CU.toExpireTime(ttl);
 
-                    if (loadedFromStore)
-                        // Update indexes before actual write to entry.
-                        updateIndex(ret, expTime, nextVer, prevVal);
+                    // Update indexes before actual write to entry.
+                    updateIndex(ret, expTime, nextVer, prevVal);
 
                     boolean hadValPtr = hasOffHeapPointer();
 
@@ -982,8 +955,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         EVT_CACHE_OBJECT_READ,
                         ret,
                         ret != null,
-                        old,
-                        hasOldBytes,
+                        null,
+                        false,
                         subjId,
                         transformClo != null ? transformClo.getClass().getName() : null,
                         taskName,
@@ -1143,7 +1116,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             boolean startVer = isStartVersion();
 
             if (startVer)
-                unswap(retval);
+                unswap(retval, false);
 
             newVer = explicitVer != null ? explicitVer : tx == null ?
                 nextVersion() : tx.writeVersion();
@@ -1555,7 +1528,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             // Load and remove from swap if it is new.
             if (isNew())
-                unswap(retval);
+                unswap(retval, false);
 
             // Possibly get old value form store.
             old = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
@@ -1918,8 +1891,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             checkObsolete();
 
             // Load and remove from swap if it is new.
-            if (isNew())
-                unswap(retval);
+            if (isStartVersion())
+                unswap(retval, false);
 
             Object transformClo = null;
 
@@ -2777,19 +2750,31 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public boolean markObsoleteIfEmpty(@Nullable GridCacheVersion ver) throws IgniteCheckedException {
+    @Override public boolean markObsoleteIfEmpty(@Nullable GridCacheVersion obsoleteVer) throws IgniteCheckedException {
         boolean obsolete = false;
         boolean deferred = false;
+        GridCacheVersion ver0 = null;
 
         try {
             synchronized (this) {
                 if (obsoleteVersionExtras() != null)
                     return false;
 
-                if (!hasValueUnlocked() || checkExpired()) {
-                    if (ver == null)
-                        ver = nextVersion();
+                if (hasValueUnlocked()) {
+                    long expireTime = expireTimeExtras();
 
+                    if (expireTime > 0 && (expireTime - U.currentTimeMillis() <= 0)) {
+                        if (onExpired(rawGetOrUnmarshal(false), obsoleteVer)) {
+                            if (cctx.deferredDelete()) {
+                                deferred = true;
+                                ver0 = ver;
+                            }
+                            else
+                                obsolete = true;
+                        }
+                    }
+                }
+                else {
                     if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) {
                         if (!deletedUnlocked()) {
                             update(null, 0L, 0L, ver, true);
@@ -2797,10 +2782,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             deletedUnlocked(true);
 
                             deferred = true;
+                            ver0 = ver;
                         }
                     }
-                    else
-                        obsolete = markObsolete0(ver, true, null);
+                    else {
+                        if (obsoleteVer == null)
+                            obsoleteVer = nextVersion();
+
+                        obsolete = markObsolete0(obsoleteVer, true, null);
+                    }
                 }
             }
         }
@@ -2809,7 +2799,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 onMarkedObsolete();
 
             if (deferred)
-                cctx.onDeferredDelete(this, ver);
+                cctx.onDeferredDelete(this, ver0);
         }
 
         return obsolete;
@@ -3434,7 +3424,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         boolean isNew = isStartVersion();
 
-        CacheObject val = isNew ? unswap(true) : rawGetOrUnmarshalUnlocked(false);
+        CacheObject val = isNew ? unswap(true, false) : rawGetOrUnmarshalUnlocked(false);
 
         return new GridCachePlainVersionedEntry<>(cctx.unwrapBinaryIfNeeded(key, keepBinary, true),
             cctx.unwrapBinaryIfNeeded(val, keepBinary, true),
@@ -3677,57 +3667,34 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onTtlExpired(GridCacheVersion obsoleteVer) {
+    @Override public boolean onTtlExpired(GridCacheVersion obsoleteVer) throws GridCacheEntryRemovedException {
+        assert obsoleteVer != null;
+
         boolean obsolete = false;
         boolean deferred = false;
         GridCacheVersion ver0 = null;
 
         try {
             synchronized (this) {
-                CacheObject expiredVal = saveOldValueUnlocked(false);
-
-                boolean hasOldBytes = hasOffHeapPointer();
-
-                boolean expired = checkExpired();
-
-                if (expired) {
-                    if (!obsolete()) {
-                        if (cctx.deferredDelete() && !detached() && !isInternal()) {
-                            if (!deletedUnlocked()) {
-                                update(null, 0L, 0L, ver0 = ver, true);
+                checkObsolete();
 
-                                deletedUnlocked(true);
+                if (isStartVersion())
+                    unswap(true, false);
 
-                                deferred = true;
-                            }
-                        }
-                        else {
-                            if (markObsolete0(obsoleteVer, true, null))
-                                obsolete = true; // Success, will return "true".
-                        }
-                    }
+                long expireTime = expireTimeExtras();
 
-                    clearIndex(expiredVal);
+                if (expireTime == 0 || (expireTime - U.currentTimeMillis() > 0))
+                    return false;
 
-                    releaseSwap();
+                CacheObject expiredVal = rawGetOrUnmarshal(false);
 
-                    if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
-                        cctx.events().addEvent(partition(),
-                            key,
-                            cctx.localNodeId(),
-                            null,
-                            EVT_CACHE_OBJECT_EXPIRED,
-                            null,
-                            false,
-                            expiredVal,
-                            expiredVal != null || hasOldBytes,
-                            null,
-                            null,
-                            null,
-                            true);
+                if (onExpired(expiredVal, obsoleteVer)) {
+                    if (cctx.deferredDelete()) {
+                        deferred = true;
+                        ver0 = ver;
                     }
-
-                    cctx.continuousQueries().onEntryExpired(this, key, expiredVal);
+                    else
+                        obsolete = true;
                 }
             }
         }
@@ -3751,7 +3718,63 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 cctx.cache().metrics0().onEvict();
         }
 
-        return obsolete;
+        return true;
+    }
+
+    /**
+     * @param expiredVal Expired value.
+     * @param obsoleteVer Version.
+     * @return {@code True} if entry was marked as removed.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean onExpired(CacheObject expiredVal, GridCacheVersion obsoleteVer) throws IgniteCheckedException {
+        assert expiredVal != null;
+
+        boolean rmvd = false;
+
+        if (mvccExtras() != null)
+            return false;
+
+        if (cctx.deferredDelete() && !detached() && !isInternal()) {
+            if (!deletedUnlocked() && !isStartVersion()) {
+                update(null, 0L, 0L, ver, true);
+
+                deletedUnlocked(true);
+
+                rmvd = true;
+            }
+        }
+        else {
+            if (obsoleteVer == null)
+                obsoleteVer = nextVersion();
+
+            if (markObsolete0(obsoleteVer, true, null))
+                rmvd = true;
+        }
+
+        clearIndex(expiredVal);
+
+        releaseSwap();
+
+        if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
+            cctx.events().addEvent(partition(),
+                key,
+                cctx.localNodeId(),
+                null,
+                EVT_CACHE_OBJECT_EXPIRED,
+                null,
+                false,
+                expiredVal,
+                expiredVal != null,
+                null,
+                null,
+                null,
+                true);
+        }
+
+        cctx.continuousQueries().onEntryExpired(this, key, expiredVal);
+
+        return rmvd;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 127f1be..a060c8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1224,9 +1224,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param key Key to remove.
+     * @param part Partition.
      * @throws IgniteCheckedException If failed.
      */
-    public void remove(final KeyCacheObject key) throws IgniteCheckedException {
+    public void remove(final KeyCacheObject key, int part) throws IgniteCheckedException {
         if (!offheapEnabled && !swapEnabled)
             return;
 
@@ -1234,8 +1235,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
-        int part = cctx.affinity().partition(key);
-
         // First try offheap.
         if (offheapEnabled) {
             byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
@@ -1282,6 +1281,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param expireTime Swap entry expiration time.
      * @param keyClsLdrId Class loader ID for entry key.
      * @param valClsLdrId Class loader ID for entry value.
+     * @param wasUnswapped {@code True} if currently value is removed from swap.
      * @throws IgniteCheckedException If failed.
      */
     void write(KeyCacheObject key,
@@ -1291,7 +1291,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         long ttl,
         long expireTime,
         @Nullable IgniteUuid keyClsLdrId,
-        @Nullable IgniteUuid valClsLdrId)
+        @Nullable IgniteUuid valClsLdrId,
+        boolean wasUnswapped)
         throws IgniteCheckedException {
         if (!offheapEnabled && !swapEnabled)
             return;
@@ -1323,7 +1324,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (qryMgr.enabled())
+        if (wasUnswapped && qryMgr.enabled())
             qryMgr.onSwap(key);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 4e1fc6d..3e4561b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -128,7 +128,26 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
                 if (log.isTraceEnabled())
                     log.trace("Trying to remove expired entry from cache: " + e);
 
-                e.entry.onTtlExpired(obsoleteVer);
+                GridCacheEntryEx entry = e.entry;
+
+                boolean touch = false;
+
+                while (true) {
+                    try {
+                        if (entry.onTtlExpired(obsoleteVer))
+                            touch = false;
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException e0) {
+                        entry = entry.context().cache().entryEx(entry.key());
+
+                        touch = true;
+                    }
+                }
+
+                if (touch)
+                    entry.context().evicts().touch(entry, null);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index c4312b5..2c33fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -569,7 +569,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 
-                    cctx.swap().remove(key);
+                    cctx.swap().remove(key, id);
 
                     if (isLocStore)
                         cctx.store().remove(null, key.value(cctx.cacheObjectContext(), false));

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index fafb830..b042249 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -363,8 +363,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         if (F.isEmpty(lsnrCol))
             return;
 
-        if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE)) {
-            boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE);
+        boolean primary = cctx.affinity().primary(cctx.localNode(), e.partition(), AffinityTopologyVersion.NONE);
+
+        if (cctx.isReplicated() || primary) {
             boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
             boolean initialized = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpireAndUpdateConsistencyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpireAndUpdateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpireAndUpdateConsistencyTest.java
new file mode 100644
index 0000000..7f54a83
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpireAndUpdateConsistencyTest.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheExpireAndUpdateConsistencyTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int NODES = 5;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(4);
+
+        client = true;
+
+        Ignite client = startGrid(4);
+
+        assertTrue(client.configuration().isClientMode());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic1() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(ATOMIC, ONHEAP_TIERED, 0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic2() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(ATOMIC, ONHEAP_TIERED, 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic3() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(ATOMIC, ONHEAP_TIERED, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheap() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(ATOMIC, OFFHEAP_TIERED, 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx1() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(TRANSACTIONAL, ONHEAP_TIERED, 0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx2() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(TRANSACTIONAL, ONHEAP_TIERED, 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx3() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(TRANSACTIONAL, ONHEAP_TIERED, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheap() throws Exception {
+        updateAndEventConsistencyTest(cacheConfiguration(TRANSACTIONAL, OFFHEAP_TIERED, 1));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    private void updateAndEventConsistencyTest(CacheConfiguration<TestKey, TestValue> ccfg) throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            List<ConcurrentMap<TestKey, List<T2<TestValue, TestValue>>>> nodesEvts = new ArrayList<>();
+
+            for (int i = 0; i < NODES; i++) {
+                Ignite ignite = ignite(i);
+
+                IgniteCache<TestKey, TestValue> cache = ignite.cache(ccfg.getName());
+
+                ContinuousQuery<TestKey, TestValue> qry = new ContinuousQuery<>();
+
+                final ConcurrentMap<TestKey, List<T2<TestValue, TestValue>>> allEvts = new ConcurrentHashMap<>();
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<TestKey, TestValue>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends TestKey, ? extends TestValue>> evts) {
+                        for (CacheEntryEvent<? extends TestKey, ? extends TestValue> e : evts) {
+                            List<T2<TestValue, TestValue>> keyEvts = allEvts.get(e.getKey());
+
+                            if (keyEvts == null) {
+                                List<T2<TestValue, TestValue>> old =
+                                    allEvts.putIfAbsent(e.getKey(), keyEvts = new ArrayList<>());
+
+                                assertNull(old);
+                            }
+
+                            synchronized (keyEvts) {
+                                keyEvts.add(new T2<>(e.getValue(), e.getOldValue()));
+                            }
+                        }
+                    }
+                });
+
+                cache.query(qry);
+
+                nodesEvts.add(allEvts);
+            }
+
+            final AtomicInteger keyVal = new AtomicInteger();
+
+            for (int i = 0; i < NODES; i++) {
+                Ignite ignite = ignite(i);
+
+                log.info("Test with node: " + ignite.name());
+
+                updateAndEventConsistencyTest(ignite, ccfg.getName(), keyVal, nodesEvts, false);
+
+                if (ccfg.getAtomicityMode() == TRANSACTIONAL)
+                    updateAndEventConsistencyTest(ignite, ccfg.getName(), keyVal, nodesEvts, true);
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param cacheName Cache name.
+     * @param keyVal Key counter.
+     * @param nodesEvts Events map.
+     * @param useTx If {@code true} executes update with explicit transaction.
+     * @throws Exception If failed.
+     */
+    private void updateAndEventConsistencyTest(final Ignite node,
+        String cacheName,
+        final AtomicInteger keyVal,
+        List<ConcurrentMap<TestKey, List<T2<TestValue, TestValue>>>> nodesEvts,
+        final boolean useTx) throws Exception {
+        final ConcurrentMap<TestKey, List<T2<TestValue, TestValue>>> updates = new ConcurrentHashMap<>();
+
+        final int THREADS = 5;
+        final int KEYS_PER_THREAD = 100;
+
+        final IgniteCache<TestKey, TestValue> cache = node.cache(cacheName);
+
+        final IgniteCache<TestKey, TestValue> expPlcCache =
+            cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(SECONDS, 2)));
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                List<TestKey> keys = new ArrayList<>();
+
+                for (int i = 0; i < KEYS_PER_THREAD; i++)
+                    keys.add(new TestKey(keyVal.incrementAndGet()));
+
+                for (TestKey key : keys) {
+                    expPlcCache.put(key, new TestValue(0));
+
+                    List<T2<TestValue, TestValue>> keyUpdates = new ArrayList<>();
+
+                    keyUpdates.add(new T2<>(new TestValue(0), (TestValue)null));
+
+                    updates.put(key, keyUpdates);
+                }
+
+                long stopTime = U.currentTimeMillis() + 10_000;
+
+                int val = 0;
+
+                Set<TestKey> expired = new HashSet<>();
+
+                IgniteTransactions txs = node.transactions();
+
+                while (U.currentTimeMillis() < stopTime) {
+                    val++;
+
+                    TestValue newVal = new TestValue(val);
+
+                    for (TestKey key : keys) {
+                        Transaction tx = useTx ? txs.txStart(PESSIMISTIC, REPEATABLE_READ) : null;
+
+                        TestValue oldVal = cache.getAndPut(key, newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        List<T2<TestValue, TestValue>> keyUpdates = updates.get(key);
+
+                        keyUpdates.add(new T2<>(newVal, oldVal));
+
+                        if (oldVal == null)
+                            expired.add(key);
+                    }
+
+                    if (expired.size() == keys.size())
+                        break;
+                }
+
+                assertEquals(keys.size(), expired.size());
+            }
+        }, THREADS, "update-thread");
+
+        for (ConcurrentMap<TestKey, List<T2<TestValue, TestValue>>> evts : nodesEvts)
+            checkEvents(updates, evts);
+
+        nodesEvts.clear();
+    }
+
+    /**
+     * @param updates Cache update.
+     * @param evts Received events.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    private void checkEvents(ConcurrentMap<TestKey, List<T2<TestValue, TestValue>>> updates,
+        final ConcurrentMap<TestKey, List<T2<TestValue, TestValue>>> evts) throws Exception {
+        for (final TestKey key : updates.keySet()) {
+            final List<T2<TestValue, TestValue>> keyUpdates = updates.get(key);
+
+            assert(!F.isEmpty(keyUpdates));
+
+            GridTestUtils.waitForCondition(new PA() {
+                @Override public boolean apply() {
+                    List<T2<TestValue, TestValue>> keyEvts = evts.get(key);
+
+                    if (keyEvts == null)
+                        return false;
+
+                    synchronized (keyEvts) {
+                        return keyEvts.size() == keyUpdates.size();
+                    }
+                }
+            }, 5000);
+
+            List<T2<TestValue, TestValue>> keyEvts = evts.get(key);
+
+            assertNotNull(keyEvts);
+
+            for (int i = 0; i < keyUpdates.size(); i++) {
+                T2<TestValue, TestValue> update = keyUpdates.get(i);
+                T2<TestValue, TestValue> evt = keyEvts.get(i);
+
+                assertEquals(update.get1(), evt.get1());
+                assertEquals(update.get2(), evt.get2());
+            }
+        }
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<TestKey, TestValue> cacheConfiguration(CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        int backups) {
+        CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class TestKey implements Serializable {
+        /** */
+        private int key;
+
+        /**
+         * @param key Key.
+         */
+        public TestKey(int key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey testKey = (TestKey)o;
+
+            return key == testKey.key;
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestValue implements Serializable {
+        /** */
+        private int val;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(int val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue testVal = (TestValue)o;
+
+            return val == testVal.val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index f4cc025..794519a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -1124,7 +1124,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
                     if (e != null && e.deleted()) {
                         assertEquals(0, e.ttl());
 
-                        assertFalse(cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+                        assertFalse("Invalid entry [e=" + e + ", node=" + i + ']',
+                            cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
 
                         continue;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index e6e2a0e..44dc8ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.expiry;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.cache.store.IgniteCacheExpiryStoreLoadSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpireAndUpdateConsistencyTest;
 
 /**
  *
@@ -65,6 +66,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteCacheClientNearCacheExpiryTest.class);
 
+        suite.addTestSuite(IgniteCacheExpireAndUpdateConsistencyTest.class);
+
         return suite;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOperationsWithExpirationTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOperationsWithExpirationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOperationsWithExpirationTest.java
new file mode 100644
index 0000000..6c60586
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheOperationsWithExpirationTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+
+/**
+ *
+ */
+public class CacheOperationsWithExpirationTest extends GridCommonAbstractTest {
+    /** */
+    private static final int KEYS = 10_000;
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @param memoryMode Memory mode.
+     * @param offheapMem Offheap memory size.
+     * @param idx Indexing enabled flag.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<String, TestIndexedType> cacheConfiguration(CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        long offheapMem,
+        boolean idx) {
+        CacheConfiguration<String, TestIndexedType> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setOffHeapMaxMemory(offheapMem);
+        ccfg.setSwapEnabled(false);
+        ccfg.setBackups(1);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setWriteSynchronizationMode(PRIMARY_SYNC);
+        ccfg.setStatisticsEnabled(true);
+
+        if (idx)
+            ccfg.setIndexedTypes(String.class, TestIndexedType.class);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapLimitedIndexEnabled() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(ATOMIC, OFFHEAP_TIERED, 1024 * 1024, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapIndexEnabled() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(ATOMIC, OFFHEAP_TIERED, 0, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicIndexEnabled() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(ATOMIC, ONHEAP_TIERED, 0, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapLimitedIndexEnabled() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(TRANSACTIONAL, OFFHEAP_TIERED, 1024 * 1024, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapIndexEnabled() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(TRANSACTIONAL, OFFHEAP_TIERED, 0, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapLimited() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(ATOMIC, OFFHEAP_TIERED, 1024 * 1024, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheap() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(ATOMIC, OFFHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(ATOMIC, ONHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheapLimited() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(TRANSACTIONAL, OFFHEAP_TIERED, 1024 * 1024, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOffheap() throws Exception {
+        concurrentPutGetRemoveExpireAndQuery(cacheConfiguration(TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void concurrentPutGetRemoveExpireAndQuery(CacheConfiguration<String, TestIndexedType> ccfg)
+        throws Exception {
+        Ignite ignite = ignite(0);
+
+        final IgniteCache<String, TestIndexedType> cache = ignite.createCache(ccfg);
+
+        final boolean qryEnabled = !F.isEmpty(ccfg.getQueryEntities());
+
+        try {
+            final long stopTime = U.currentTimeMillis() + 30_000;
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    while (U.currentTimeMillis() < stopTime) {
+                        if (!qryEnabled || idx % 2 == 0)
+                            putGet(cache);
+                        else
+                            query(cache);
+                    }
+                }
+
+                void putGet(IgniteCache<String, TestIndexedType> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    cache = cache.withExpiryPolicy(new ModifiedExpiryPolicy(
+                        new Duration(MILLISECONDS, rnd.nextLong(100) + 1)));
+
+                    for (int i = 0; i < KEYS; i++) {
+                        String key = String.valueOf(rnd.nextInt(KEYS));
+
+                        cache.put(key, testValue(rnd));
+                    }
+
+                    Set<String> s = new TreeSet<>();
+
+                    for (int i = 0; i < 1000; i++) {
+                        String key = String.valueOf(rnd.nextInt(KEYS));
+
+                        s.add(key);
+                    }
+
+                    cache.getAll(s);
+
+                    cache.removeAll(s);
+                }
+
+                void query(IgniteCache<String, TestIndexedType> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    String k = String.valueOf(rnd.nextInt(KEYS));
+
+                    SqlFieldsQuery qry1;
+
+                    if (rnd.nextBoolean()) {
+                        qry1 = new SqlFieldsQuery("select _key, _val from TestIndexedType where key1=? and intVal=?");
+
+                        qry1.setArgs(k, k);
+                    }
+                    else
+                        qry1 = new SqlFieldsQuery("select _key, _val from TestIndexedType");
+
+                    List res = cache.query(qry1).getAll();
+
+                    assertNotNull(res);
+                }
+            }, 10, "test-thread");
+        }
+        finally {
+            ignite.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random.
+     * @return Test value.
+     */
+    private TestIndexedType testValue(ThreadLocalRandom rnd) {
+        StringBuilder builder = new StringBuilder();
+
+        for (int i = 0; i < rnd.nextInt(100); i++)
+            builder.append("string");
+
+        return new TestIndexedType(rnd.nextInt(KEYS), builder.toString());
+    }
+
+    /**
+     *
+     */
+    public enum EnumType1 {
+        /** */
+        TYPE1,
+        /** */
+        TYPE2,
+        /** */
+        TYPE3;
+
+        /** */
+        static final EnumType1[] vals = EnumType1.values();
+    }
+
+    /**
+     *
+     */
+    public enum EnumType2 {
+        /** */
+        TYPE1,
+        /** */
+        TYPE2,
+        /** */
+        TYPE3,
+        /** */
+        TYPE4;
+
+        /** */
+        static final EnumType2[] vals = EnumType2.values();
+    }
+
+    /**
+     *
+     */
+    public static class TestIndexedType implements Serializable {
+        /** */
+        private static final long serialVersionUID = 1L;
+
+        /** */
+        @QuerySqlField
+        private final String key;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final String key1;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final String key2;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final String key3;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final int intVal;
+
+        /** */
+        private final EnumType1 type1;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final EnumType2 type2;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final Date date1;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final Date date2;
+
+        /** */
+        private final Byte byteVal1;
+
+        /** */
+        private final Byte byteVal2;
+
+        /**
+         * @param rnd Random value.
+         * @param strVal Random string value.
+         */
+        public TestIndexedType(int rnd, String strVal) {
+            intVal = rnd;
+            key = String.valueOf(rnd);
+            key1 = key;
+            key2 = strVal;
+            key3 = strVal;
+            date1 = new Date(rnd);
+            date2 = new Date(U.currentTimeMillis());
+            type1 = EnumType1.vals[rnd % EnumType1.vals.length];
+            type2 = EnumType2.vals[rnd % EnumType2.vals.length];
+            byteVal1 = (byte)rnd;
+            byteVal2 = (byte)rnd;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java
index 02c4312..2fbde41 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapIndexGetSelfTest.java
@@ -22,6 +22,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.cache.Cache;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -156,6 +158,52 @@ public class GridCacheOffheapIndexGetSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testWithExpiryPolicy() throws Exception {
+        IgniteCache<Long, Long> cache = grid(0).cache(null);
+
+        cache = cache.withExpiryPolicy(new TestExiryPolicy());
+
+        for (long i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        for (long i = 0; i < 100; i++)
+            assertEquals((Long)i, cache.get(i));
+
+        SqlQuery<Long, Long> qry = new SqlQuery<>(Long.class, "_val >= 90");
+
+        List<Cache.Entry<Long, Long>> res = cache.query(qry).getAll();
+
+        assertEquals(10, res.size());
+
+        for (Cache.Entry<Long, Long> e : res) {
+            assertNotNull(e.getKey());
+            assertNotNull(e.getValue());
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestExiryPolicy implements ExpiryPolicy {
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForCreation() {
+            return Duration.ONE_MINUTE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForAccess() {
+            return Duration.FIVE_MINUTES;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForUpdate() {
+            return Duration.TWENTY_MINUTES;
+        }
+    }
+
+    /**
      * Test entry class.
      */
     private static class TestEntity implements Serializable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/69a0afa7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 929fb37..a85b7a6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
 import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
+import org.apache.ignite.internal.processors.cache.CacheOperationsWithExpirationTest;
 import org.apache.ignite.internal.processors.cache.CacheRandomOperationsMultithreadedTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
@@ -74,6 +75,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
         suite.addTestSuite(CacheRandomOperationsMultithreadedTest.class);
         suite.addTestSuite(IgniteCacheStarvationOnRebalanceTest.class);
+        suite.addTestSuite(CacheOperationsWithExpirationTest.class);
 
         return suite;
     }


Mime
View raw message