ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [19/46] ignite git commit: ignite-1089 Fixed tests for multi jvm mode. This closes #846. (cherry picked from commit bf57413)
Date Wed, 20 Jul 2016 09:07:04 GMT
ignite-1089 Fixed tests for multi jvm mode. This closes #846.
(cherry picked from commit bf57413)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d2198a1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d2198a1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d2198a1d

Branch: refs/heads/ignite-1232
Commit: d2198a1df0e5c907434ec1a465eb118823723439
Parents: ade57ea
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jul 4 21:08:24 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 15 16:21:37 2016 +0300

----------------------------------------------------------------------
 .../cache/GridCacheAbstractFullApiSelfTest.java | 160 ++++++++++--------
 .../IgniteCacheConfigVariationsFullApiTest.java | 157 +++++++++--------
 ...idCacheNearOnlyMultiNodeFullApiSelfTest.java | 168 ++++++++-----------
 .../junits/common/GridCommonAbstractTest.java   |  12 +-
 .../multijvm/IgniteCacheProcessProxy.java       | 109 ++++++------
 5 files changed, 317 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d2198a1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 5ab23a0..32d46e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
@@ -3559,9 +3560,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      * @throws Exception If failed.
      */
     private void checkTtl(boolean inTx, boolean oldEntry) throws Exception {
-        if (isMultiJvm())
-            fail("https://issues.apache.org/jira/browse/IGNITE-1089");
-
         if (memoryMode() == OFFHEAP_TIERED)
             return;
 
@@ -3573,22 +3571,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         final String key = primaryKeysForCache(jcache(), 1).get(0);
 
-        GridCacheAdapter<String, Integer> internalCache = internalCache(fullCache());
-
-        if (internalCache.isNear())
-            internalCache = internalCache.context().near().dht();
-
-        GridCacheEntryEx entry;
+        IgnitePair<Long> entryTtl;
 
         if (oldEntry) {
             c.put(key, 1);
 
-            entry = internalCache.peekEx(key);
+            entryTtl = entryTtl(fullCache(), key);
 
-            assert entry != null;
-
-            assertEquals(0, entry.ttl());
-            assertEquals(0, entry.expireTime());
+            assertNotNull(entryTtl.get1());
+            assertNotNull(entryTtl.get2());
+            assertEquals((Long)0L, entryTtl.get1());
+            assertEquals((Long)0L, entryTtl.get2());
         }
 
         long startTime = System.currentTimeMillis();
@@ -3605,10 +3598,10 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             }
 
             if (oldEntry) {
-                entry = internalCache.peekEx(key);
+                entryTtl = entryTtl(fullCache(), key);
 
-                assertEquals(0, entry.ttl());
-                assertEquals(0, entry.expireTime());
+                assertEquals((Long)0L, entryTtl.get1());
+                assertEquals((Long)0L, entryTtl.get2());
             }
         }
 
@@ -3630,18 +3623,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<String, Integer> cache = internalCache(jcache(i));
-
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
-
-                GridCacheEntryEx curEntry = cache.peekEx(key);
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                assertEquals(ttl, curEntry.ttl());
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long) curEntryTtl.get1());
+                assertTrue(curEntryTtl.get2() > startTime);
 
-                assert curEntry.expireTime() > startTime;
-
-                expireTimes[i] = curEntry.expireTime();
+                expireTimes[i] = curEntryTtl.get2();
             }
         }
 
@@ -3663,18 +3652,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<String, Integer> cache = internalCache(jcache(i));
-
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
-
-                GridCacheEntryEx curEntry = cache.peekEx(key);
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                assertEquals(ttl, curEntry.ttl());
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long) curEntryTtl.get1());
+                assertTrue(curEntryTtl.get2() > startTime);
 
-                assert curEntry.expireTime() > startTime;
-
-                expireTimes[i] = curEntry.expireTime();
+                expireTimes[i] = curEntryTtl.get2();
             }
         }
 
@@ -3696,18 +3681,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<String, Integer> cache = internalCache(jcache(i));
-
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                GridCacheEntryEx curEntry = cache.peekEx(key);
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long) curEntryTtl.get1());
+                assertTrue(curEntryTtl.get2() > startTime);
 
-                assertEquals(ttl, curEntry.ttl());
-
-                assert curEntry.expireTime() > startTime;
-
-                expireTimes[i] = curEntry.expireTime();
+                expireTimes[i] = curEntryTtl.get2();
             }
         }
 
@@ -3733,15 +3714,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<String, Integer> cache = internalCache(jcache(i));
-
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                GridCacheEntryEx curEntry = cache.peekEx(key);
-
-                assertEquals(ttl, curEntry.ttl());
-                assertEquals(expireTimes[i], curEntry.expireTime());
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long)curEntryTtl.get1());
+                assertEquals(expireTimes[i], (long)curEntryTtl.get2());
             }
         }
 
@@ -3777,21 +3755,24 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             }
         }, Math.min(ttl * 10, getTestTimeout())));
 
-        if (internalCache.isLocal())
-            return;
+        IgniteCache fullCache = fullCache();
 
-        assert c.get(key) == null;
+        if (!isMultiJvmObject(fullCache)) {
+            GridCacheAdapter internalCache = internalCache(fullCache);
 
-        internalCache = internalCache(fullCache());
+            if (internalCache.isLocal())
+                return;
+        }
 
-        if (internalCache.isNear())
-            internalCache = internalCache.context().near().dht();
+        assert c.get(key) == null;
 
         // Ensure that old TTL and expire time are not longer "visible".
-        entry = internalCache.peekEx(key);
+        entryTtl = entryTtl(fullCache(), key);
 
-        assertEquals(0, entry.ttl());
-        assertEquals(0, entry.expireTime());
+        assertNotNull(entryTtl.get1());
+        assertNotNull(entryTtl.get2());
+        assertEquals(0, (long)entryTtl.get1());
+        assertEquals(0, (long)entryTtl.get2());
 
         // Ensure that next update will not pick old expire time.
 
@@ -3810,12 +3791,14 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         U.sleep(2000);
 
-        entry = internalCache.peekEx(key);
+        entryTtl = entryTtl(fullCache(), key);
 
         assertEquals((Integer)10, c.get(key));
 
-        assertEquals(0, entry.ttl());
-        assertEquals(0, entry.expireTime());
+        assertNotNull(entryTtl.get1());
+        assertNotNull(entryTtl.get2());
+        assertEquals(0, (long)entryTtl.get1());
+        assertEquals(0, (long)entryTtl.get2());
     }
 
     /**
@@ -4319,6 +4302,15 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     }
 
     /**
+     * @param cache Cache.
+     * @param key Entry key.
+     * @return Pair [ttl, expireTime]; both values null if entry not found
+     */
+    protected IgnitePair<Long> entryTtl(IgniteCache cache, String key) {
+        return executeOnLocalOrRemoteJvm(cache, new EntryTtlTask(key, true));
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testIterator() throws Exception {
@@ -5613,6 +5605,40 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     /**
      *
      */
+    public static class EntryTtlTask implements TestCacheCallable<String, Integer, IgnitePair<Long>> {
+        /** Entry key. */
+        private final String key;
+
+        /** Check cache for nearness, use DHT cache if it is near. */
+        private final boolean useDhtForNearCache;
+
+        /**
+         * @param key Entry key.
+         * @param useDhtForNearCache Check cache for nearness, use DHT cache if it is near.
+         */
+        public EntryTtlTask(String key, boolean useDhtForNearCache) {
+            this.key = key;
+            this.useDhtForNearCache = useDhtForNearCache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgnitePair<Long> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
+            GridCacheAdapter<?, ?> internalCache = internalCache0(cache);
+
+            if (useDhtForNearCache && internalCache.context().isNear())
+                internalCache = internalCache.context().near().dht();
+
+            GridCacheEntryEx entry = internalCache.peekEx(key);
+
+            return entry != null ?
+                new IgnitePair<>(entry.ttl(), entry.expireTime()) :
+                new IgnitePair<Long>(null, null);
+        }
+    }
+
+    /**
+     *
+     */
     private static class CheckIteratorTask extends TestIgniteIdxCallable<Void> {
         /**
          * @param idx Index.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2198a1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index 7e6d015..2ca09c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
@@ -3462,9 +3463,6 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
      * @throws Exception If failed.
      */
     private void checkTtl(boolean inTx, boolean oldEntry) throws Exception {
-        if (isMultiJvm())
-            fail("https://issues.apache.org/jira/browse/IGNITE-1089");
-
         if (memoryMode() == OFFHEAP_TIERED)
             return;
 
@@ -3476,22 +3474,14 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
 
         final String key = primaryKeysForCache(1).get(0);
 
-        GridCacheAdapter<String, Integer> internalCache = internalCache(serverNodeCache());
-
-        if (internalCache.isNear())
-            internalCache = internalCache.context().near().dht();
-
-        GridCacheEntryEx entry;
+        IgnitePair<Long> entryTtl;
 
         if (oldEntry) {
             c.put(key, 1);
 
-            entry = internalCache.peekEx(key);
-
-            assert entry != null;
-
-            assertEquals(0, entry.ttl());
-            assertEquals(0, entry.expireTime());
+            entryTtl = entryTtl(serverNodeCache(), key);
+            assertEquals((Long)0L, entryTtl.get1());
+            assertEquals((Long)0L, entryTtl.get2());
         }
 
         long startTime = System.currentTimeMillis();
@@ -3508,10 +3498,12 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
             }
 
             if (oldEntry) {
-                entry = internalCache.peekEx(key);
+                entryTtl = entryTtl(serverNodeCache(), key);
 
-                assertEquals(0, entry.ttl());
-                assertEquals(0, entry.expireTime());
+                assertNotNull(entryTtl.get1());
+                assertNotNull(entryTtl.get2());
+                assertEquals((Long)0L, entryTtl.get1());
+                assertEquals((Long)0L, entryTtl.get2());
             }
         }
 
@@ -3533,18 +3525,13 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<Object, Object> cache = internalCache(jcache(i));
-
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                GridCacheEntryEx curEntry = cache.peekEx(key);
-
-                assertEquals(ttl, curEntry.ttl());
-
-                assert curEntry.expireTime() > startTime;
-
-                expireTimes[i] = curEntry.expireTime();
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long)curEntryTtl.get1());
+                assertTrue(curEntryTtl.get2() > startTime);
+                expireTimes[i] = curEntryTtl.get2();
             }
         }
 
@@ -3566,18 +3553,13 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<Object, Object> cache = internalCache(jcache(i));
-
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
-
-                GridCacheEntryEx curEntry = cache.peekEx(key);
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                assertEquals(ttl, curEntry.ttl());
-
-                assert curEntry.expireTime() > startTime;
-
-                expireTimes[i] = curEntry.expireTime();
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long)curEntryTtl.get1());
+                assertTrue(curEntryTtl.get2() > startTime);
+                expireTimes[i] = curEntryTtl.get2();
             }
         }
 
@@ -3599,18 +3581,13 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<Object, Object> cache = internalCache(jcache(i));
-
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
-
-                GridCacheEntryEx curEntry = cache.peekEx(key);
-
-                assertEquals(ttl, curEntry.ttl());
-
-                assert curEntry.expireTime() > startTime;
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                expireTimes[i] = curEntry.expireTime();
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long)curEntryTtl.get1());
+                assertTrue(curEntryTtl.get2() > startTime);
+                expireTimes[i] = curEntryTtl.get2();
             }
         }
 
@@ -3636,15 +3613,12 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
 
         for (int i = 0; i < gridCount(); i++) {
             if (grid(i).affinity(cacheName()).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter cache = internalCache(jcache(i));
+                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);
 
-                if (cache.context().isNear())
-                    cache = cache.context().near().dht();
-
-                GridCacheEntryEx curEntry = cache.peekEx(key);
-
-                assertEquals(ttl, curEntry.ttl());
-                assertEquals(expireTimes[i], curEntry.expireTime());
+                assertNotNull(curEntryTtl.get1());
+                assertNotNull(curEntryTtl.get2());
+                assertEquals(ttl, (long)curEntryTtl.get1());
+                assertEquals(expireTimes[i], (long)curEntryTtl.get2());
             }
         }
 
@@ -3680,21 +3654,24 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
             }
         }, Math.min(ttl * 10, getTestTimeout())));
 
-        if (internalCache.isLocal())
-            return;
+        IgniteCache srvNodeCache = serverNodeCache();
 
-        assert c.get(key) == null;
+        if (!isMultiJvmObject(srvNodeCache)) {
+            GridCacheAdapter internalCache = internalCache(srvNodeCache);
 
-        internalCache = internalCache(serverNodeCache());
+            if (internalCache.isLocal())
+                return;
+        }
 
-        if (internalCache.isNear())
-            internalCache = internalCache.context().near().dht();
+        assert c.get(key) == null;
 
         // Ensure that old TTL and expire time are not longer "visible".
-        entry = internalCache.peekEx(key);
+        entryTtl = entryTtl(srvNodeCache, key);
 
-        assertEquals(0, entry.ttl());
-        assertEquals(0, entry.expireTime());
+        assertNotNull(entryTtl.get1());
+        assertNotNull(entryTtl.get2());
+        assertEquals(0, (long)entryTtl.get1());
+        assertEquals(0, (long)entryTtl.get2());
 
         // Ensure that next update will not pick old expire time.
 
@@ -3713,12 +3690,14 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
 
         U.sleep(2000);
 
-        entry = internalCache.peekEx(key);
+        entryTtl = entryTtl(srvNodeCache, key);
 
         assertEquals((Integer)10, c.get(key));
 
-        assertEquals(0, entry.ttl());
-        assertEquals(0, entry.expireTime());
+        assertNotNull(entryTtl.get1());
+        assertNotNull(entryTtl.get2());
+        assertEquals(0, (long)entryTtl.get1());
+        assertEquals(0, (long)entryTtl.get2());
     }
 
     /**
@@ -4300,6 +4279,15 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
     }
 
     /**
+     * @param cache Cache.
+     * @param key Entry key.
+     * @return Pair [ttl, expireTime]; both values null if entry not found
+     */
+    protected IgnitePair<Long> entryTtl(IgniteCache cache, String key) {
+        return executeOnLocalOrRemoteJvm(cache, new EntryTtlTask(key));
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testIterator() throws Exception {
@@ -5643,6 +5631,33 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar
         }
     }
 
+    /** */
+    private static class EntryTtlTask implements TestCacheCallable<String, Integer, IgnitePair<Long>> {
+        /** */
+        private final String key;
+
+        /**
+         * @param key Key.
+         */
+        private EntryTtlTask(String key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgnitePair<Long> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
+            GridCacheAdapter<?, ?> internalCache = internalCache0(cache);
+
+            if (internalCache.context().isNear())
+                internalCache = internalCache.context().near().dht();
+
+            GridCacheEntryEx entry = internalCache.peekEx(key);
+
+            return entry != null ?
+                new IgnitePair<>(entry.ttl(), entry.expireTime()) :
+                new IgnitePair<Long>(null, null);
+        }
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2198a1d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
index 411e1bf..5fab800 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
@@ -39,10 +39,10 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -211,6 +211,10 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
      * @throws Exception If failed.
      */
     public void testReaderTtlTx() throws Exception {
+        // IgniteProcessProxy#transactions is not implemented.
+        if (isMultiJvm())
+            return;
+
         checkReaderTtl(true);
     }
 
@@ -222,13 +226,19 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
     }
 
     /**
+     * @param cache Cache.
+     * @param key Entry key.
+     * @return Pair [ttl, expireTime] for near cache entry; both values null if entry not found
+     */
+    protected IgnitePair<Long> nearEntryTtl(IgniteCache cache, String key) {
+        return executeOnLocalOrRemoteJvm(cache, new EntryTtlTask(key, false));
+    }
+
+    /**
      * @param inTx If {@code true} starts explicit transaction.
      * @throws Exception If failed.
      */
     private void checkReaderTtl(boolean inTx) throws Exception {
-        if (isMultiJvm())
-            fail("https://issues.apache.org/jira/browse/IGNITE-1089");
-
         int ttl = 1000;
 
         final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, ttl));
@@ -242,17 +252,14 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
         info("Finished first put.");
 
         {
-            GridCacheAdapter<String, Integer> dht = internalCache(fullCache());
-
-            if (dht.context().isNear())
-                dht = dht.context().near().dht();
+            IgnitePair<Long> entryTtl = entryTtl(fullCache(), key);
 
-            GridCacheEntryEx entry = dht.peekEx(key);
+            assertEquals((Integer)1, c.get(key));
 
-            assert entry != null;
-
-            assertEquals(0, entry.ttl());
-            assertEquals(0, entry.expireTime());
+            assertNotNull(entryTtl.get1());
+            assertNotNull(entryTtl.get2());
+            assertEquals(0, (long)entryTtl.get1());
+            assertEquals(0, (long)entryTtl.get2());
         }
 
         long startTime = System.currentTimeMillis();
@@ -278,30 +285,19 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
         for (int i = 0; i < gridCount(); i++) {
             info("Checking grid: " + grid(i).localNode().id());
 
-            GridCacheEntryEx entry = null;
-
-            if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<String, Integer> dht = internalCache(jcache(i));
+            IgnitePair<Long> entryTtl = null;
 
-                if (dht.context().isNear())
-                    dht = dht.context().near().dht();
+            if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key))
+                entryTtl = entryTtl(jcache(i), key);
+            else if (i == nearIdx)
+                entryTtl = nearEntryTtl(jcache(i), key);
 
-                entry = dht.peekEx(key);
-
-                assert entry != null;
-            }
-            else if (i == nearIdx) {
-                GridCacheAdapter<String, Integer> near = internalCache(jcache(i));
-
-                entry = near.peekEx(key);
-
-                assert entry != null;
-            }
-
-            if (entry != null) {
-                assertEquals(ttl, entry.ttl());
-                assert entry.expireTime() > startTime;
-                expireTimes[i] = entry.expireTime();
+            if (entryTtl != null) {
+                assertNotNull(entryTtl.get1());
+                assertNotNull(entryTtl.get2());
+                assertEquals(ttl, (long)entryTtl.get1());
+                assertTrue(entryTtl.get2() > startTime);
+                expireTimes[i] = entryTtl.get2();
             }
         }
 
@@ -322,30 +318,19 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
         }
 
         for (int i = 0; i < gridCount(); i++) {
-            GridCacheEntryEx entry = null;
-
-            if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<String, Integer> dht = internalCache(jcache(i));
-
-                if (dht.context().isNear())
-                    dht = dht.context().near().dht();
-
-                entry = dht.peekEx(key);
-
-                assert entry != null;
-            }
-            else if (i == nearIdx) {
-                GridCacheAdapter<String, Integer> near = internalCache(jcache(i));
-
-                entry = near.peekEx(key);
-
-                assert entry != null;
-            }
-
-            if (entry != null) {
-                assertEquals(ttl, entry.ttl());
-                assert entry.expireTime() > expireTimes[i];
-                expireTimes[i] = entry.expireTime();
+            IgnitePair<Long> entryTtl = null;
+
+            if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key))
+                entryTtl = entryTtl(jcache(i), key);
+            else if (i == nearIdx)
+                entryTtl = nearEntryTtl(jcache(i), key);
+
+            if (entryTtl != null) {
+                assertNotNull(entryTtl.get1());
+                assertNotNull(entryTtl.get2());
+                assertEquals(ttl, (long)entryTtl.get1());
+                assertTrue(entryTtl.get2() > startTime);
+                expireTimes[i] = entryTtl.get2();
             }
         }
 
@@ -363,29 +348,18 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
         }
 
         for (int i = 0; i < gridCount(); i++) {
-            GridCacheEntryEx entry = null;
-
-            if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) {
-                GridCacheAdapter<String, Integer> dht = internalCache(jcache(i));
-
-                if (dht.context().isNear())
-                    dht = dht.context().near().dht();
-
-                entry = dht.peekEx(key);
-
-                assert entry != null;
-            }
-            else if (i == nearIdx) {
-                GridCacheAdapter<String, Integer> near = internalCache(jcache(i));
-
-                entry = near.peekEx(key);
-
-                assert entry != null;
-            }
-
-            if (entry != null) {
-                assertEquals(ttl, entry.ttl());
-                assertEquals(expireTimes[i], entry.expireTime());
+            IgnitePair<Long> entryTtl = null;
+
+            if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key))
+                entryTtl = entryTtl(jcache(i), key);
+            else if (i == nearIdx)
+                entryTtl = nearEntryTtl(jcache(i), key);
+
+            if (entryTtl != null) {
+                assertNotNull(entryTtl.get1());
+                assertNotNull(entryTtl.get2());
+                assertEquals(ttl, (long)entryTtl.get1());
+                assertEquals(expireTimes[i], (long)entryTtl.get2());
             }
         }
 
@@ -421,17 +395,12 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
 
         // Ensure that old TTL and expire time are not longer "visible".
         {
-            GridCacheAdapter<String, Integer> dht = internalCache(fullCache());
-
-            if (dht.context().isNear())
-                dht = dht.context().near().dht();
+            IgnitePair<Long> entryTtl = entryTtl(fullCache(), key);
 
-            GridCacheEntryEx entry = dht.peekEx(key);
-
-            assert entry != null;
-
-            assertEquals(0, entry.ttl());
-            assertEquals(0, entry.expireTime());
+            assertNotNull(entryTtl.get1());
+            assertNotNull(entryTtl.get2());
+            assertEquals(0, (long)entryTtl.get1());
+            assertEquals(0, (long)entryTtl.get2());
         }
 
         // Ensure that next update will not pick old expire time.
@@ -451,17 +420,12 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
         U.sleep(2000);
 
         {
-            GridCacheAdapter<String, Integer> dht = internalCache(fullCache());
-
-            if (dht.context().isNear())
-                dht = dht.context().near().dht();
-
-            GridCacheEntryEx entry = dht.peekEx(key);
-
-            assert entry != null;
+            IgnitePair<Long> entryTtl = entryTtl(fullCache(), key);
 
-            assertEquals(0, entry.ttl());
-            assertEquals(0, entry.expireTime());
+            assertNotNull(entryTtl.get1());
+            assertNotNull(entryTtl.get2());
+            assertEquals(0, (long)entryTtl.get1());
+            assertEquals(0, (long)entryTtl.get2());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2198a1d/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 5b03f8e..87509a4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -161,15 +161,23 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      * @param cache Cache.
      * @return Cache.
      */
-    protected <K, V> GridCacheAdapter<K, V> internalCache(IgniteCache<K, V> cache) {
+    protected static <K, V> GridCacheAdapter<K, V> internalCache0(IgniteCache<K, V> cache) {
         if (isMultiJvmObject(cache))
-            throw new UnsupportedOperationException("Oparetion can't be supported automatically for multi jvm " +
+            throw new UnsupportedOperationException("Operation can't be supported automatically for multi jvm " +
                 "(send closure instead).");
 
         return ((IgniteKernal)cache.unwrap(Ignite.class)).internalCache(cache.getName());
     }
 
     /**
+     * @param cache Cache.
+     * @return Cache.
+     */
+    protected <K, V> GridCacheAdapter<K, V> internalCache(IgniteCache<K, V> cache) {
+        return internalCache0(cache);
+    }
+
+    /**
      * @return Cache.
      */
     protected <K, V> IgniteCache<K, V> jcache() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d2198a1d/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 3be0423..adbf1ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -65,6 +65,9 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     /** With async. */
     private final boolean isAsync;
 
+    /** Expiry policy. */
+    private final ExpiryPolicy expiryPlc;
+
     /** Ignite proxy. */
     private final transient IgniteProcessProxy igniteProxy;
 
@@ -73,24 +76,26 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
      * @param proxy Ignite Process Proxy.
      */
     public IgniteCacheProcessProxy(String name, IgniteProcessProxy proxy) {
-        this(name, false, proxy);
+        this(name, false, null, proxy);
     }
 
     /**
      * @param name Name.
      * @param async Async flag.
+     * @param plc Expiry policy.
      * @param proxy Ignite Process Proxy.
      */
-    public IgniteCacheProcessProxy(String name, boolean async, IgniteProcessProxy proxy) {
+    private IgniteCacheProcessProxy(String name, boolean async, ExpiryPolicy plc, IgniteProcessProxy proxy) {
         cacheName = name;
         isAsync = async;
+        expiryPlc = plc;
         igniteProxy = proxy;
         compute = proxy.remoteCompute();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withAsync() {
-        return new IgniteCacheProcessProxy<>(cacheName, true, igniteProxy);
+        return new IgniteCacheProcessProxy<>(cacheName, true, null, igniteProxy);
     }
 
     /** {@inheritDoc} */
@@ -116,7 +121,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
-        throw new UnsupportedOperationException("Method should be supported.");
+        return new IgniteCacheProcessProxy<>(cacheName, isAsync, plc, igniteProxy);
     }
 
     /** {@inheritDoc} */
@@ -271,7 +276,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
 
     /** {@inheritDoc} */
     @Override public void put(K key, V val) {
-        compute.call(new PutTask<>(cacheName, isAsync, key, val));
+        compute.call(new PutTask<>(cacheName, isAsync, expiryPlc, key, val));
     }
 
     /** {@inheritDoc} */
@@ -449,6 +454,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
+    /** {@inheritDoc} */
     @Override public CacheMetrics localMetrics() {
         throw new UnsupportedOperationException("Method should be supported.");
     }
@@ -458,6 +464,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
         throw new UnsupportedOperationException("Method should be supported.");
     }
 
+    /** {@inheritDoc} */
     @Override public CacheMetricsMXBean localMxBean() {
         throw new UnsupportedOperationException("Method should be supported.");
     }
@@ -480,7 +487,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param clazz Clazz.
          */
         public GetConfigurationTask(String cacheName, boolean async, Class<C> clazz) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.clazz = clazz;
         }
 
@@ -507,7 +514,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param args Args.
          */
         public LocalLoadCacheTask(String cacheName, boolean async, IgniteBiPredicate<K, V> p, Object[] args) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.p = p;
             this.args = args;
         }
@@ -537,7 +544,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param val Value.
          */
         public GetAndPutIfAbsentTask(String cacheName, boolean async, K key, V val) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.val = val;
         }
@@ -565,7 +572,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param byCurrThread By current thread.
          */
         public IsLocalLockedTask(String cacheName, boolean async, K key, boolean byCurrThread) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.byCurrThread = byCurrThread;
         }
@@ -589,7 +596,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param peekModes Peek modes.
          */
         public LocalEntriesTask(String cacheName, boolean async, CachePeekMode[] peekModes) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.peekModes = peekModes;
         }
 
@@ -617,7 +624,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param keys Keys.
          */
         public LocalEvictTask(String cacheName, boolean async, Collection<? extends K> keys) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.keys = keys;
         }
 
@@ -646,7 +653,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param peekModes Peek modes.
          */
         public LocalPeekTask(String cacheName, boolean async, K key, CachePeekMode[] peekModes) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.peekModes = peekModes;
         }
@@ -674,7 +681,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param loc Local.
          */
         public SizeTask(String cacheName, boolean async, CachePeekMode[] peekModes, boolean loc) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.loc = loc;
             this.peekModes = peekModes;
         }
@@ -702,7 +709,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param loc Local.
          */
         public SizeLongTask(String cacheName, boolean async, CachePeekMode[] peekModes, boolean loc) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.loc = loc;
             this.peekModes = peekModes;
         }
@@ -760,7 +767,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param key Key.
          */
         public GetTask(String cacheName, boolean async, K key) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
         }
 
@@ -783,7 +790,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param key Key.
          */
         public GetEntryTask(String cacheName, boolean async, K key) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
         }
 
@@ -802,7 +809,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param async Async.
          */
         public RemoveAllTask(String cacheName, boolean async) {
-            super(cacheName, async);
+            super(cacheName, async, null);
         }
 
         /** {@inheritDoc} */
@@ -831,11 +838,12 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
         /**
          * @param cacheName Cache name.
          * @param async Async.
+         * @param expiryPlc Expiry policy.
          * @param key Key.
          * @param val Value.
          */
-        public PutTask(String cacheName, boolean async, K key, V val) {
-            super(cacheName, async);
+        public PutTask(String cacheName, boolean async, ExpiryPolicy expiryPlc, K key, V val) {
+            super(cacheName, async, expiryPlc);
             this.key = key;
             this.val = val;
         }
@@ -861,7 +869,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param key Key.
          */
         public ContainsKeyTask(String cacheName, boolean async, K key) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
         }
 
@@ -880,7 +888,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param async Async.
          */
         public ClearTask(String cacheName, boolean async) {
-            super(cacheName, async);
+            super(cacheName, async, null);
         }
 
         /** {@inheritDoc} */
@@ -900,7 +908,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param async Async.
          */
         public IteratorTask(String cacheName, boolean async) {
-            super(cacheName, async);
+            super(cacheName, async, null);
         }
 
         /** {@inheritDoc} */
@@ -931,7 +939,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param val Value.
          */
         public ReplaceTask(String cacheName, boolean async, K key, V val) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.val = val;
         }
@@ -951,7 +959,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param async Async.
          */
         public GetNameTask(String cacheName, boolean async) {
-            super(cacheName, async);
+            super(cacheName, async, null);
         }
 
         /** {@inheritDoc} */
@@ -973,7 +981,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param key Key.
          */
         public RemoveTask(String cacheName, boolean async, K key) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
         }
 
@@ -996,7 +1004,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param map Map.
          */
         public PutAllTask(String cacheName, boolean async, Map<? extends K, ? extends V> map) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.map = map;
         }
 
@@ -1021,7 +1029,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param keys Keys.
          */
         public RemoveAllKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.keys = keys;
         }
 
@@ -1046,7 +1054,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param keys Keys.
          */
         public GetAllTask(String cacheName, boolean async, Set<? extends K> keys) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.keys = keys;
         }
 
@@ -1069,7 +1077,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param keys Keys.
          */
         public GetEntriesTask(String cacheName, boolean async, Set<? extends K> keys) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.keys = keys;
         }
 
@@ -1092,7 +1100,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param keys Keys.
          */
         public GetAllOutTxTask(String cacheName, boolean async, Set<? extends K> keys) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.keys = keys;
         }
 
@@ -1115,7 +1123,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param keys Keys.
          */
         public ContainsKeysTask(String cacheName, boolean async, Set<? extends K> keys) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.keys = keys;
         }
 
@@ -1142,7 +1150,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param val Value.
          */
         public GetAndPutTask(String cacheName, boolean async, K key, V val) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.val = val;
         }
@@ -1170,7 +1178,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param val Value.
          */
         public PutIfAbsentTask(String cacheName, boolean async, K key, V val) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.val = val;
         }
@@ -1198,7 +1206,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param oldVal Old value.
          */
         public RemoveIfExistsTask(String cacheName, boolean async, K key, V oldVal) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.oldVal = oldVal;
         }
@@ -1222,7 +1230,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param key Key.
          */
         public GetAndRemoveTask(String cacheName, boolean async, K key) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
         }
 
@@ -1253,7 +1261,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param newVal New value.
          */
         public ReplaceIfExistsTask(String cacheName, boolean async, K key, V oldVal, V newVal) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.oldVal = oldVal;
             this.newVal = newVal;
@@ -1282,7 +1290,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param val Value.
          */
         public GetAndReplaceTask(String cacheName, boolean async, K key, V val) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.val = val;
         }
@@ -1309,7 +1317,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param key Key.
          */
         public ClearKeyTask(String cacheName, boolean async, boolean loc, K key) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.key = key;
             this.loc = loc;
         }
@@ -1341,7 +1349,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param keys Keys.
          */
         public ClearAllKeys(String cacheName, boolean async, boolean loc, Set<? extends K> keys) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.keys = keys;
             this.loc = loc;
         }
@@ -1379,7 +1387,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          */
         public InvokeTask(String cacheName, boolean async, K key, EntryProcessor<K, V, R> processor,
             Object[] args) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.args = args;
             this.key = key;
             this.processor = processor;
@@ -1413,7 +1421,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          */
         public InvokeAllTask(String cacheName, boolean async, Set<? extends K> keys,
             EntryProcessor<K, V, T> processor, Object[] args) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.args = args;
             this.keys = keys;
             this.processor = processor;
@@ -1434,7 +1442,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param async Async.
          */
         public CloseTask(String cacheName, boolean async) {
-            super(cacheName, async);
+            super(cacheName, async, null);
         }
 
         /** {@inheritDoc} */
@@ -1454,7 +1462,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param async Async.
          */
         public DestroyTask(String cacheName, boolean async) {
-            super(cacheName, async);
+            super(cacheName, async, null);
         }
 
         /** {@inheritDoc} */
@@ -1474,7 +1482,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param async Async.
          */
         public IsClosedTask(String cacheName, boolean async) {
-            super(cacheName, async);
+            super(cacheName, async, null);
         }
 
         /** {@inheritDoc} */
@@ -1496,7 +1504,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
          * @param clazz Clazz.
          */
         public UnwrapTask(String cacheName, boolean async, Class<R> clazz) {
-            super(cacheName, async);
+            super(cacheName, async, null);
             this.clazz = clazz;
         }
 
@@ -1520,21 +1528,28 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
         /** Async. */
         protected final boolean async;
 
+        /** Expiry policy. */
+        protected final ExpiryPolicy expiryPlc;
+
         /**
          * @param cacheName Cache name.
          * @param async Async.
+         * @param expiryPlc Optional expiry policy.
          */
-        public CacheTaskAdapter(String cacheName, boolean async) {
+        public CacheTaskAdapter(String cacheName, boolean async, ExpiryPolicy expiryPlc) {
             this.async = async;
             this.cacheName = cacheName;
+            this.expiryPlc = expiryPlc;
         }
 
         /**
-         * Returns cache instance.
+         * @return Cache instance.
          */
         protected IgniteCache<K, V> cache() {
             IgniteCache<K, V> cache = ignite.cache(cacheName);
 
+            cache = expiryPlc != null ? cache.withExpiryPolicy(expiryPlc) : cache;
+
             return async ? cache.withAsync() : cache;
         }
     }


Mime
View raw message