ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [03/38] incubator-ignite git commit: # ignite-41
Date Tue, 23 Dec 2014 08:22:39 GMT
# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2a51321
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2a51321
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2a51321

Branch: refs/heads/ignite-1
Commit: c2a513218cb832390bdc6b228eaddb10af2c27a4
Parents: 7f1b3f0
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Dec 15 10:29:48 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Dec 15 17:45:23 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  94 +++++----
 .../processors/cache/GridCacheAdapter.java      |  71 +++++--
 .../processors/cache/GridCacheContext.java      |  20 ++
 .../processors/cache/GridCacheEntryEx.java      |   5 +-
 .../processors/cache/GridCacheMapEntry.java     | 138 +++++++++++---
 .../processors/cache/GridCacheProcessor.java    |   2 +-
 .../processors/cache/GridCacheProjectionEx.java |  12 ++
 .../cache/GridCacheProjectionImpl.java          |  83 ++++++--
 .../processors/cache/GridCacheProxyImpl.java    |  11 ++
 .../distributed/GridCacheExpiryPolicy.java      |  88 +++++++++
 .../dht/atomic/GridDhtAtomicCache.java          |  59 ++++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  27 ++-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  30 +--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  17 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  37 ++--
 .../atomic/GridNearAtomicUpdateResponse.java    |  41 +---
 .../distributed/near/GridNearAtomicCache.java   |  19 +-
 .../processors/cache/IgniteCacheTest.java       |   2 +
 .../expiry/IgniteCacheExpiryPolicyTest.java     | 191 +++++++++++++++----
 .../processors/cache/GridCacheTestEntryEx.java  |  28 ++-
 20 files changed, 749 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 5971d69..f1d27f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -13,7 +13,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.util.tostring.*;
@@ -36,25 +35,32 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Context. */
+    /** */
     private GridCacheContext<K, V> ctx;
 
     /** Gateway. */
     private GridCacheGateway<K, V> gate;
 
-    /** Cache. */
+    /** Delegate. */
     @GridToStringInclude
-    private GridCacheAdapter<K, V> delegate;
+    private GridCacheProjectionEx<K, V> delegate;
+
+    /** Projection. */
+    private GridCacheProjectionImpl<K, V> prj;
 
     /**
      * @param delegate Delegate.
+     * @param prj Projection.
      */
-    public IgniteCacheProxy(GridCacheAdapter<K, V> delegate) {
+    public IgniteCacheProxy(GridCacheContext<K, V> ctx,
+        GridCacheProjectionEx<K, V> delegate,
+        @Nullable GridCacheProjectionImpl<K, V> prj) {
+        assert ctx != null;
         assert delegate != null;
 
+        this.ctx = ctx;
         this.delegate = delegate;
-
-        ctx = delegate.context();
+        this.prj = prj;
 
         gate = ctx.gate();
     }
@@ -73,8 +79,16 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
-        // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            GridCacheProjectionEx<K, V> prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc);
+
+            return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl<K, V>)prj0);
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */
@@ -92,7 +106,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.putIfAbsent(key, val);
@@ -126,7 +140,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public boolean isLocked(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
             return delegate.isLocked(key);
@@ -138,7 +152,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public boolean isLockedByThread(K key) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
             return delegate.isLockedByThread(key);
@@ -162,7 +176,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public void localEvict(Collection<? extends K> keys) {
-        GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
             delegate.evictAll(keys);
@@ -175,13 +189,23 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
         // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+        if (peekModes.length != 0)
+            throw new UnsupportedOperationException();
+
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.peek(key);
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void localPromote(Set<? extends K> keys) throws CacheException {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 delegate.promoteAll(keys);
@@ -216,7 +240,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public V get(K key) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.get(key);
@@ -233,7 +257,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public Map<K, V> getAll(Set<? extends K> keys) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.getAll(keys);
@@ -264,10 +288,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public void put(K key, V val) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                delegate.putx(key, val, null);
+                delegate.putx(key, val);
             }
             finally {
                 gate.leave(prev);
@@ -281,7 +305,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public V getAndPut(K key, V val) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.put(key, val);
@@ -298,10 +322,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public void putAll(Map<? extends K, ? extends V> map) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                delegate.putAll(map, null);
+                delegate.putAll(map);
             }
             finally {
                 gate.leave(prev);
@@ -315,7 +339,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public boolean putIfAbsent(K key, V val) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.putxIfAbsent(key, val);
@@ -332,7 +356,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public boolean remove(K key) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.removex(key);
@@ -349,7 +373,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public boolean remove(K key, V oldVal) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.remove(key, oldVal);
@@ -366,10 +390,10 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public V getAndRemove(K key) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.remove(key, (GridCacheEntryEx<K, V>)null);
+                return delegate.remove(key);
             }
             finally {
                 gate.leave(prev);
@@ -383,7 +407,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V oldVal, V newVal) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.replace(key, oldVal, newVal);
@@ -400,7 +424,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V val) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.replacex(key, val);
@@ -417,7 +441,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public V getAndReplace(K key, V val) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 return delegate.replace(key, val);
@@ -434,7 +458,7 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
     /** {@inheritDoc} */
     @Override public void removeAll(Set<? extends K> keys) {
         try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(null);
+            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
                 delegate.removeAll(keys);
@@ -581,15 +605,17 @@ public class IgniteCacheProxy<K, V> implements IgniteCache<K, V>, Externalizable
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx);
         out.writeObject(delegate);
+        out.writeObject(prj);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        delegate = (GridCacheAdapter<K, V>)in.readObject();
-
-        ctx = delegate.context();
+        ctx = (GridCacheContext<K, V>)in.readObject();
+        delegate = (GridCacheProjectionEx<K, V>)in.readObject();
+        prj = (GridCacheProjectionImpl<K, V>)in.readObject();
 
         gate = ctx.gate();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index bf40a85..559949f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -40,6 +40,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -52,8 +53,6 @@ import static org.gridgain.grid.cache.GridCacheFlag.*;
 import static org.gridgain.grid.cache.GridCachePeekMode.*;
 import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
 import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
-import static org.gridgain.grid.cache.GridCacheTxState.*;
-import static org.apache.ignite.events.IgniteEventType.*;
 import static org.gridgain.grid.kernal.GridClosureCallMode.*;
 import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
 import static org.gridgain.grid.kernal.processors.task.GridTaskThreadContextKey.*;
@@ -367,8 +366,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
 
     /** {@inheritDoc} */
     @Override public GridCacheProjectionEx<K, V> forSubjectId(UUID subjId) {
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, null, null,
-            null, subjId, false);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            ctx,
+            null,
+            null,
+            null,
+            subjId,
+            false,
+            null);
 
         return new GridCacheProxyImpl<>(ctx, prj, prj);
     }
@@ -378,8 +383,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
         if (F.isEmpty(flags))
             return this;
 
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, null, null,
-            EnumSet.copyOf(F.asList(flags)), null, false);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            ctx,
+            null,
+            null,
+            EnumSet.copyOf(F.asList(flags)),
+            null,
+            false,
+            null);
 
         return new GridCacheProxyImpl<>(ctx, prj, prj);
     }
@@ -398,12 +409,31 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
             null,
             null,
             null,
-            ctx.portableEnabled());
+            ctx.portableEnabled(),
+            null);
 
         return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj);
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public ExpiryPolicy expiry() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        return new GridCacheProjectionImpl<>(
+            this,
+            ctx,
+            null,
+            null,
+            null,
+            null,
+            ctx.portableEnabled(),
+            plc);
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "RedundantCast"})
     @Override public <K1, V1> GridCacheProjection<K1, V1> projection(
         Class<? super K1> keyType,
@@ -423,8 +453,13 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
         }
 
         GridCacheProjectionImpl<K1, V1> prj = new GridCacheProjectionImpl<>((GridCacheProjection<K1, V1>)this,
-            (GridCacheContext<K1, V1>)ctx, CU.<K1, V1>typeFilter(keyType, valType), /*filter*/null, /*flags*/null,
-            /*clientId*/null, false);
+            (GridCacheContext<K1, V1>)ctx,
+            CU.<K1, V1>typeFilter(keyType, valType),
+            /*filter*/null,
+            /*flags*/null,
+            /*clientId*/null,
+            false,
+            null);
 
         return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, prj, prj);
     }
@@ -443,7 +478,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
             }
         }
 
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, ctx, p, null, null, null, false);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            ctx,
+            p,
+            null,
+            null,
+            null,
+            false,
+            null);
 
         return new GridCacheProxyImpl<>(ctx, prj, prj);
     }
@@ -463,7 +505,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
         }
 
         GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(
-            this, ctx, null, filter, null, null, false);
+            this,
+            ctx,
+            null,
+            filter,
+            null,
+            null,
+            false,
+            null);
 
         return new GridCacheProxyImpl<>(ctx, prj, prj);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index c6cb355..58fd1cd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -46,6 +46,8 @@ import org.gridgain.grid.util.offheap.unsafe.*;
 import org.gridgain.grid.util.tostring.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -170,6 +172,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** Cache ID. */
     private int cacheId;
 
+    /** */
+    private ExpiryPolicy expiryPlc;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -275,6 +280,20 @@ public class GridCacheContext<K, V> implements Externalizable {
         }
         else
             cacheId = 1;
+
+        Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory();
+
+        expiryPlc = factory.create();
+
+        if (expiryPlc instanceof EternalExpiryPolicy)
+            expiryPlc = null;
+    }
+
+    /**
+     * @return Cache default {@link ExpiryPolicy}.
+     */
+    @Nullable public ExpiryPolicy expiry() {
+        return expiryPlc;
     }
 
     /**
@@ -1054,6 +1073,7 @@ public class GridCacheContext<K, V> implements Externalizable {
 
     /**
      * Gets thread local projection.
+     *
      * @return Projection per call.
      */
     public GridCacheProjectionImpl<K, V> projectionPerCall() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index e94016f..fc8aaaa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -18,6 +18,7 @@ import org.gridgain.grid.kernal.processors.dr.*;
 import org.gridgain.grid.util.lang.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.util.*;
 
 /**
@@ -390,7 +391,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
      * @param valBytes Value bytes. Can be non-null only if operation is UPDATE.
      * @param writeThrough Write through flag.
      * @param retval Return value flag.
-     * @param ttl Time to live.
+     * @param expiryPlc Expiry policy.
      * @param evt Event flag.
      * @param metrics Metrics update flag.
      * @param primary If update is performed on primary node (the one which assigns version).
@@ -422,7 +423,7 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
         @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
-        long ttl,
+        @Nullable ExpiryPolicy expiryPlc,
         boolean evt,
         boolean metrics,
         boolean primary,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 32c1485..b723d71 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -27,6 +27,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 import sun.misc.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -1615,6 +1616,38 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         return new IgniteBiTuple<>(res, cctx.<V>unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old));
     }
 
+    /**
+     * @param expiryPlc Expiry policy.
+     * @param isNew {@code True} if entry is new.
+     * @return TTL.
+     */
+    private static long ttlFromPolicy(@Nullable ExpiryPolicy expiryPlc, boolean isNew) {
+        if (expiryPlc == null)
+            return -1L;
+
+        Duration duration = isNew ? expiryPlc.getExpiryForCreation() : expiryPlc.getExpiryForUpdate();
+
+        return toTtl(duration);
+    }
+
+    private static long toTtl(Duration duration) {
+        if (duration == null)
+            return -1;
+
+        if (duration.getDurationAmount() == 0) {
+            if (duration.isEternal())
+                return 0;
+
+            assert duration.isZero();
+
+            return 1L;
+        }
+
+        assert duration.getTimeUnit() != null;
+
+        return duration.getTimeUnit().toMillis(duration.getDurationAmount());
+    }
+
     /** {@inheritDoc} */
     @Override public GridCacheUpdateAtomicResult<K, V> innerUpdate(
         GridCacheVersion newVer,
@@ -1625,7 +1658,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         @Nullable byte[] valBytes,
         boolean writeThrough,
         boolean retval,
-        long ttl,
+        @Nullable ExpiryPolicy expiryPlc,
         boolean evt,
         boolean metrics,
         boolean primary,
@@ -1668,7 +1701,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             Object transformClo = null;
 
             if (drResolve) {
-                drRes = cctx.dr().resolveAtomic(this, op, writeObj, valBytes, ttl, drTtl, drExpireTime, drVer);
+                drRes = cctx.dr().resolveAtomic(this,
+                    op,
+                    writeObj,
+                    valBytes,
+                    ttlFromPolicy(expiryPlc, isNew()),
+                    drTtl,
+                    drExpireTime,
+                    drVer);
 
                 if (drRes != null) {
                     if (drRes.isUseOld()) {
@@ -1730,26 +1770,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']';
             }
 
-            if (drRes == null) {
-                // Calculate TTL and expire time for local update.
-                if (drTtl >= 0L) {
-                    assert drExpireTime >= 0L;
-
-                    newTtl = drTtl;
-                    newExpireTime = drExpireTime;
-                }
-                else {
-                    assert drExpireTime == -1L;
-
-                    newTtl = ttl;
-
-                    if (newTtl < 0)
-                        newTtl = ttlExtras();
-
-                    newExpireTime = toExpireTime(newTtl);
-                }
-            }
-
             // Possibly get old value form store.
             old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
 
@@ -1777,7 +1797,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             if (!F.isEmptyOrNulls(filter)) {
                 boolean pass = cctx.isAll(wrapFilterLocked(), filter);
 
-                if (!pass)
+                if (!pass) {
+                    if (!isNew() && expiryPlc != null) {
+                        Duration duration = expiryPlc.getExpiryForAccess();
+
+                        if (duration != null)
+                            updateTtl(toTtl(duration));
+                    }
+
                     return new GridCacheUpdateAtomicResult<>(false,
                         retval ? old : null,
                         null,
@@ -1786,6 +1813,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         null,
                         null,
                         false);
+                }
             }
 
             // Apply metrics.
@@ -1841,6 +1869,46 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     }
                 }
 
+                if (drRes == null) {
+                    // Calculate TTL and expire time for local update.
+                    if (drTtl >= 0L) {
+                        assert drExpireTime >= 0L;
+
+                        newTtl = drTtl;
+                        newExpireTime = drExpireTime;
+                    }
+                    else {
+                        assert drExpireTime == -1L;
+
+                        if (expiryPlc != null) {
+                            if (!hadVal) {
+                                Duration duration = expiryPlc.getExpiryForCreation();
+
+                                if (duration != null && duration.isZero())
+                                    return new GridCacheUpdateAtomicResult<>(false,
+                                        retval ? old : null,
+                                        null,
+                                        0L,
+                                        -1L,
+                                        null,
+                                        null,
+                                        false);
+
+                                newTtl = toTtl(duration);
+                            }
+                            else
+                                newTtl = toTtl(expiryPlc.getExpiryForUpdate());
+                        }
+                        else
+                            newTtl = -1L;
+
+                        if (newTtl < 0)
+                            newTtl = ttlExtras();
+
+                        newExpireTime = toExpireTime(newTtl);
+                    }
+                }
+
                 // Try write-through.
                 if (writeThrough)
                     // Must persist inside synchronization in non-tx mode.
@@ -1859,7 +1927,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     assert !deletedUnlocked() : "Invalid entry [entry=" + this +
                         ", locNodeId=" + cctx.localNodeId() + ']';
 
-                    // Do not change size;
+                    // Do not change size.
                 }
 
                 if (cctx.portableEnabled())
@@ -2393,6 +2461,28 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /**
+     * @param ttl Time to live.
+     */
+    private void updateTtl(long ttl) {
+        assert Thread.holdsLock(this);
+
+        if (ttl == -1L)
+            return;
+
+        long expireTime = toExpireTime(ttl);
+
+        long oldExpireTime = expireTimeExtras();
+
+        if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+            cctx.ttl().removeTrackedEntry(this);
+
+        ttlAndExpireTimeExtras(ttl, expireTime);
+
+        if (expireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+            cctx.ttl().addTrackedEntry(this);
+    }
+
+    /**
      * @return {@code true} If value bytes should be stored.
      */
     protected boolean isStoreValueBytes() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index a083805..5dd98b3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -1596,7 +1596,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (cache == null)
             throw new IllegalArgumentException("Cache is not configured: " + name);
 
-        return new IgniteCacheProxy<>(cache);
+        return new IgniteCacheProxy<>(cache.context(), cache, null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
index d8fbdfe..2544eb9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionEx.java
@@ -16,6 +16,7 @@ import org.gridgain.grid.cache.store.*;
 import org.gridgain.grid.kernal.processors.cache.dr.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.util.*;
 
 /**
@@ -380,4 +381,15 @@ public interface GridCacheProjectionEx<K, V> extends GridCacheProjection<K, V> {
      * @return Primary entry set.
      */
     public Set<GridCacheEntry<K, V>> primaryEntrySetx(IgnitePredicate<GridCacheEntry<K, V>>... filter);
+
+    /**
+     * @return {@link ExpiryPolicy} associated with this projection.
+     */
+    public @Nullable ExpiryPolicy expiry();
+
+    /**
+     * @param plc {@link ExpiryPolicy} to associate with this projection.
+     * @return New projection based on this one, but with the specified expiry policy.
+     */
+    public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
index 67eb9e8..9a73e64 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProjectionImpl.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -73,6 +74,9 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     /** */
     private boolean keepPortable;
 
+    /** */
+    private ExpiryPolicy expiryPlc;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -95,7 +99,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
         @Nullable IgnitePredicate<? super GridCacheEntry<K, V>> entryFilter,
         @Nullable Set<GridCacheFlag> flags,
         @Nullable UUID subjId,
-        boolean keepPortable) {
+        boolean keepPortable,
+        @Nullable ExpiryPolicy expiryPlc) {
         assert parent != null;
         assert cctx != null;
 
@@ -125,6 +130,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
         qry = new GridCacheQueriesImpl<>(cctx, this);
 
         this.keepPortable = keepPortable;
+
+        this.expiryPlc = expiryPlc;
     }
 
     /**
@@ -367,8 +374,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     @Override public GridCacheProjectionEx<K, V> forSubjectId(UUID subjId) {
         A.notNull(subjId, "subjId");
 
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
-            noNullEntryFilter.entryFilter, flags, subjId, keepPortable);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            cctx,
+            noNullKvFilter.kvFilter,
+            noNullEntryFilter.entryFilter,
+            flags,
+            subjId,
+            keepPortable,
+            expiryPlc);
 
         return new GridCacheProxyImpl<>(cctx, prj, prj);
     }
@@ -415,7 +428,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
             (IgnitePredicate<GridCacheEntry>)noNullEntryFilter.entryFilter,
             flags,
             subjId,
-            keepPortable);
+            keepPortable,
+            expiryPlc);
 
         return new GridCacheProxyImpl((GridCacheContext<K1, V1>)cctx, prj, prj);
     }
@@ -439,8 +453,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
             }
         }
 
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, kvFilter,
-            noNullEntryFilter.entryFilter, flags, subjId, keepPortable);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            cctx,
+            kvFilter,
+            noNullEntryFilter.entryFilter,
+            flags,
+            subjId,
+            keepPortable,
+            expiryPlc);
 
         return new GridCacheProxyImpl<>(cctx, prj, prj);
     }
@@ -463,8 +483,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
             }
         }
 
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
-            filter, flags, subjId, keepPortable);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            cctx,
+            noNullKvFilter.kvFilter,
+            filter,
+            flags,
+            subjId,
+            keepPortable,
+            expiryPlc);
 
         return new GridCacheProxyImpl<>(cctx, prj, prj);
     }
@@ -482,8 +508,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
 
         res.addAll(EnumSet.copyOf(F.asList(flags)));
 
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
-            noNullEntryFilter.entryFilter, res, subjId, keepPortable);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            cctx,
+            noNullKvFilter.kvFilter,
+            noNullEntryFilter.entryFilter,
+            res,
+            subjId,
+            keepPortable,
+            expiryPlc);
 
         return new GridCacheProxyImpl<>(cctx, prj, prj);
     }
@@ -500,8 +532,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
 
         res.removeAll(EnumSet.copyOf(F.asList(flags)));
 
-        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this, cctx, noNullKvFilter.kvFilter,
-            noNullEntryFilter.entryFilter, res, subjId, keepPortable);
+        GridCacheProjectionImpl<K, V> prj = new GridCacheProjectionImpl<>(this,
+            cctx,
+            noNullKvFilter.kvFilter,
+            noNullEntryFilter.entryFilter,
+            res,
+            subjId,
+            keepPortable,
+            expiryPlc);
 
         return new GridCacheProxyImpl<>(cctx, prj, prj);
     }
@@ -516,7 +554,8 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
                 (IgnitePredicate<GridCacheEntry>)noNullEntryFilter.entryFilter,
                 flags,
                 subjId,
-                true);
+                true,
+                expiryPlc);
 
             return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)cctx, prj, prj);
         }
@@ -1242,6 +1281,24 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public @Nullable ExpiryPolicy expiry() {
+        return expiryPlc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        return new GridCacheProjectionImpl<>(
+            this,
+            cctx,
+            noNullKvFilter.kvFilter,
+            noNullEntryFilter.entryFilter,
+            flags,
+            subjId,
+            true,
+            plc);
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(cctx);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
index 8b6ade8..38cc2ca 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProxyImpl.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -1879,6 +1880,16 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public ExpiryPolicy expiry() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheProjectionEx<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheProxyImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
new file mode 100644
index 0000000..f7fe27a
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
@@ -0,0 +1,88 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import javax.cache.expiry.*;
+import java.io.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
+    /** */
+    private ExpiryPolicy plc;
+
+    /** */
+    private static final byte CREATE_TTL_MASK = 0x01;
+
+    /** */
+    private static final byte UPDATE_TTL_MASK = 0x02;
+
+    /** */
+    private Duration forCreate;
+
+    /** */
+    private Duration forUpdate;
+
+    /**
+     * @param plc Expiry policy.
+     */
+    public GridCacheExpiryPolicy(ExpiryPolicy plc) {
+        assert plc != null;
+
+        this.plc = plc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForCreation() {
+        return forCreate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForAccess() {
+        assert false;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Duration getExpiryForUpdate() {
+        return forUpdate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        byte flags = 0;
+
+        Duration create = plc.getExpiryForCreation();
+
+        if (create != null)
+            flags |= CREATE_TTL_MASK;
+
+        Duration update = plc.getExpiryForUpdate();
+
+        if (update != null)
+            flags |= UPDATE_TTL_MASK;
+
+        out.writeByte(flags);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        byte flags = in.readByte();
+
+        if ((flags & CREATE_TTL_MASK) != 0)
+            forCreate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+
+        if ((flags & UPDATE_TTL_MASK) != 0)
+            forUpdate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4991adb..fd2d98d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -33,6 +33,7 @@ import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 import sun.misc.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -594,7 +595,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(GridSecurityPermission.CACHE_PUT);
 
-        UUID subjId = ctx.subjectIdPerCall(null);
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+        UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41.
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
@@ -611,7 +614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             retval,
             rawRetval,
             cached,
-            ttl,
+            prj != null ? prj.expiry() : null,
             filter,
             subjId,
             taskNameHash);
@@ -651,7 +654,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE);
 
-        UUID subjId = ctx.subjectIdPerCall(null);
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+        UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41.
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
@@ -667,7 +672,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             retval,
             rawRetval,
             cached,
-            0,
+            (filter != null && prj != null) ? prj.expiry() : null,
             filter,
             subjId,
             taskNameHash);
@@ -897,7 +902,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         boolean replicate = ctx.isDrEnabled();
 
-                        if (storeEnabled() && keys.size() > 1 && ctx.dr().receiveEnabled()) {
+                        if (storeEnabled() && keys.size() > 1 && !ctx.dr().receiveEnabled()) {
                             // This method can only be used when there are no replicated entries in the batch.
                             UpdateBatchResult<K, V> updRes = updateWithBatch(node, hasNear, req, res, locked, ver,
                                 dhtFut, completionCb, replicate, taskName);
@@ -1023,6 +1028,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             try {
                 if (!checkFilter(entry, req, res)) {
+                    // TODO IGNITE-41 update TTL.
+
                     if (log.isDebugEnabled())
                         log.debug("Entry did not pass the filter (will skip write) [entry=" + entry +
                             ", filter=" + Arrays.toString(req.filter()) + ", res=" + res + ']');
@@ -1284,6 +1291,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
+        ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             K k = keys.get(i);
@@ -1331,7 +1340,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     newValBytes,
                     primary && storeEnabled(),
                     req.returnValue(),
-                    req.ttl(),
+                    expiry,
                     true,
                     true,
                     primary,
@@ -1372,14 +1381,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             transformC = (IgniteClosure<V, V>)writeVal;
 
                         if (!readersOnly)
-                            dhtFut.addWriteEntry(entry, updRes.newValue(), newValBytes, transformC,
-                                drExpireTime >= 0L ? ttl : -1L, drExpireTime, newDrVer, drExpireTime < 0L ? ttl : 0L);
+                            dhtFut.addWriteEntry(entry,
+                                updRes.newValue(),
+                                newValBytes,
+                                transformC,
+                                drExpireTime >= 0L ? ttl : -1L,
+                                drExpireTime,
+                                newDrVer,
+                                drExpireTime < 0L ? req.expiry() : null);
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), newValBytes,
-                                transformC, drExpireTime < 0L ? ttl : 0L);
+                                transformC, drExpireTime < 0L ? req.expiry() : null);
                     }
                     else {
+                        // TODO IGNITE-41 ttl could be changed.
+
                         if (log.isDebugEnabled())
                             log.debug("Entry did not pass the filter or conflict resolution (will skip write) " +
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
@@ -1391,7 +1408,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
                             GridDrResolveResult<V> ctx = updRes.drResolveResult();
 
-                            res.nearTtl(updRes.newTtl());
+                            // TODO IGNITE-41 dr ttl for near cache.
 
                             if (ctx != null && ctx.isMerge())
                                 newValBytes = null;
@@ -1524,6 +1541,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
+            ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry<K, V> entry = entries.get(i);
@@ -1562,12 +1581,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         false,
                         false,
-                        req.ttl(),
+                        expiry,
                         true,
                         true,
                         primary,
                         ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
-                        req.filter(),
+                        req.filter(), // TODO IGNITE-41 filter already evaluated?
                         replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
                         -1L,
                         -1L,
@@ -1605,11 +1624,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key());
 
                         if (!batchRes.readersOnly())
-                            dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.ttl());
+                            dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.expiry());
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, valBytes, transformC,
-                                req.ttl());
+                                req.expiry());
                     }
 
                     if (hasNear) {
@@ -1625,8 +1644,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                     res.addNearValue(idx, writeVal, valBytes);
                                 }
 
-                                res.nearTtl(req.ttl());
-
                                 if (writeVal != null || !entry.valueBytes().isNull()) {
                                     IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
 
@@ -1861,9 +1878,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             drPutVals = new ArrayList<>(size);
 
             for (int i = 0; i < size; i++) {
-                Long ttl = req.drTtl(i);
+                long ttl = req.drTtl(i);
 
-                if (ttl == null)
+                if (ttl == -1L)
                     drPutVals.add(new GridCacheDrInfo<>(req.value(i), req.drVersion(i)));
                 else
                     drPutVals.add(new GridCacheDrExpirationInfo<>(req.value(i), req.drVersion(i), ttl,
@@ -1894,7 +1911,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             req.returnValue(),
             false,
             null,
-            req.ttl(),
+            req.expiry(),
             req.filter(),
             req.subjectId(),
             req.taskNameHash());
@@ -2020,6 +2037,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
+        ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
         for (int i = 0; i < req.size(); i++) {
             K key = req.key(i);
 
@@ -2048,7 +2067,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             valBytes,
                             /*write-through*/false,
                             /*retval*/false,
-                            req.ttl(),
+                            expiry,
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 982c777..3c7da7b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -23,6 +23,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -201,10 +202,16 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
      * @param drTtl DR TTL (optional).
      * @param drExpireTime DR expire time (optional).
      * @param drVer DR version (optional).
-     * @param ttl Time to live.
+     * @param expiryPlc Expiry policy.
      */
-    public void addWriteEntry(GridDhtCacheEntry<K, V> entry, @Nullable V val, @Nullable byte[] valBytes,
-        IgniteClosure<V, V> transformC, long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer, long ttl) {
+    public void addWriteEntry(GridDhtCacheEntry<K, V> entry,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        IgniteClosure<V, V> transformC,
+        long drTtl,
+        long drExpireTime,
+        @Nullable GridCacheVersion drVer,
+        @Nullable ExpiryPolicy expiryPlc) {
         long topVer = updateReq.topologyVersion();
 
         Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
@@ -230,7 +237,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                         writeVer,
                         syncMode,
                         topVer,
-                        ttl,
+                        expiryPlc,
                         forceTransformBackups,
                         this.updateReq.subjectId(),
                         this.updateReq.taskNameHash());
@@ -249,10 +256,14 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
      * @param entry Entry.
      * @param val Value.
      * @param valBytes Value bytes.
-     * @param ttl Time to live.
+     * @param expiryPlc Expiry policy..
      */
-    public void addNearWriteEntries(Iterable<UUID> readers, GridDhtCacheEntry<K, V> entry, @Nullable V val,
-        @Nullable byte[] valBytes, IgniteClosure<V, V> transformC, long ttl) {
+    public void addNearWriteEntries(Iterable<UUID> readers,
+        GridDhtCacheEntry<K, V> entry,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        IgniteClosure<V, V> transformC,
+        @Nullable ExpiryPolicy expiryPlc) {
         GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
 
         keys.add(entry.key());
@@ -276,7 +287,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                     writeVer,
                     syncMode,
                     topVer,
-                    ttl,
+                    expiryPlc,
                     forceTransformBackups,
                     this.updateReq.subjectId(),
                     this.updateReq.taskNameHash());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index ad3f8da..c3b0918 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -20,6 +20,7 @@ import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -79,8 +80,11 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     /** Write synchronization mode. */
     private GridCacheWriteSynchronizationMode syncMode;
 
-    /** Time to live. */
-    private long ttl;
+    /** Expiry policy. */
+    private ExpiryPolicy expiryPlc;
+
+    /** Expiry policy bytes. */
+    private byte[] expiryPlcBytes;
 
     /** Keys to update. */
     @GridToStringInclude
@@ -150,7 +154,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param writeVer Write version for cache values.
      * @param syncMode Cache write synchronization mode.
      * @param topVer Topology version.
-     * @param ttl Time to live.
+     * @param expiryPlc Expiry policy.
      * @param forceTransformBackups Force transform backups flag.
      * @param subjId Subject ID.
      */
@@ -161,7 +165,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         GridCacheVersion writeVer,
         GridCacheWriteSynchronizationMode syncMode,
         long topVer,
-        long ttl,
+        @Nullable ExpiryPolicy expiryPlc,
         boolean forceTransformBackups,
         UUID subjId,
         int taskNameHash
@@ -171,7 +175,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         this.futVer = futVer;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
-        this.ttl = ttl;
+        this.expiryPlc = expiryPlc;
         this.topVer = topVer;
         this.forceTransformBackups = forceTransformBackups;
         this.subjId = subjId;
@@ -360,10 +364,10 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     }
 
     /**
-     * @return Time to live.
+     * @return Expiry policy.
      */
-    public long ttl() {
-        return ttl;
+    @Nullable public ExpiryPolicy expiry() {
+        return expiryPlc;
     }
 
     /**
@@ -621,7 +625,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         _clone.drTtls = drTtls;
         _clone.drExpireTimes = drExpireTimes;
         _clone.syncMode = syncMode;
-        _clone.ttl = ttl;
+        _clone.expiryPlc = expiryPlc;
         _clone.nearKeys = nearKeys;
         _clone.nearKeyBytes = nearKeyBytes;
         _clone.nearVals = nearVals;
@@ -742,7 +746,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 commState.idx++;
 
             case 11:
-                if (!commState.putLong(ttl))
+                if (!commState.putByteArray(expiryPlcBytes))
                     return false;
 
                 commState.idx++;
@@ -1037,10 +1041,12 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 commState.idx++;
 
             case 11:
-                if (buf.remaining() < 8)
+                byte[] expiryPlcBytes0 = commState.getByteArray();
+
+                if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
                     return false;
 
-                ttl = commState.getLong();
+                expiryPlcBytes = expiryPlcBytes0;
 
                 commState.idx++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d660112..4750462 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -26,6 +26,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -93,8 +94,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
     /** Cached entry if keys size is 1. */
     private GridCacheEntryEx<K, V> cached;
 
-    /** Time to live. */
-    private final long ttl;
+    /** Expiry policy. */
+    private final ExpiryPolicy expiryPlc;
 
     /** Future map topology version. */
     private long topVer;
@@ -141,7 +142,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         futVer = null;
         retval = false;
         fastMap = false;
-        ttl = 0;
+        expiryPlc = null;
         filter = null;
         syncMode = null;
         op = null;
@@ -162,7 +163,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
      * @param retval Return value require flag.
      * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
      * @param cached Cached entry if keys size is 1.
-     * @param ttl Time to live.
+     * @param expiryPlc Expiry policy.
      * @param filter Entry filter.
      */
     public GridNearAtomicUpdateFuture(
@@ -177,7 +178,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         final boolean retval,
         final boolean rawRetval,
         @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl,
+        @Nullable ExpiryPolicy expiryPlc,
         final IgnitePredicate<GridCacheEntry<K, V>>[] filter,
         UUID subjId,
         int taskNameHash
@@ -201,7 +202,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         this.drRmvVals = drRmvVals;
         this.retval = retval;
         this.cached = cached;
-        this.ttl = ttl;
+        this.expiryPlc = expiryPlc;
         this.filter = filter;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
@@ -556,7 +557,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 op,
                 retval,
                 op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
-                ttl,
+                expiryPlc,
                 filter,
                 subjId,
                 taskNameHash);
@@ -662,7 +663,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                             op,
                             retval,
                             op == TRANSFORM && cctx.hasFlag(FORCE_TRANSFORM_BACKUP),
-                            ttl,
+                            expiryPlc,
                             filter,
                             subjId,
                             taskNameHash);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 00512ff..3eca7e2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -14,12 +14,14 @@ import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.direct.*;
 import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.nio.*;
 import java.util.*;
@@ -88,8 +90,11 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
     /** Return value flag. */
     private boolean retval;
 
-    /** Time to live. */
-    private long ttl;
+    /** Expiry policy. */
+    private ExpiryPolicy expiryPlc;
+
+    /** Expiry policy bytes. */
+    private byte[] expiryPlcBytes;
 
     /** Filter. */
     @GridDirectTransient
@@ -132,7 +137,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
      * @param retval Return value required flag.
-     * @param ttl Time to live.
+     * @param expiryPlc Expiry policy.
      * @param filter Optional filter for atomic check.
      */
     public GridNearAtomicUpdateRequest(
@@ -146,7 +151,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
         GridCacheOperation op,
         boolean retval,
         boolean forceTransformBackups,
-        long ttl,
+        ExpiryPolicy expiryPlc,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
         @Nullable UUID subjId,
         int taskNameHash
@@ -162,7 +167,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
         this.op = op;
         this.retval = retval;
         this.forceTransformBackups = forceTransformBackups;
-        this.ttl = ttl;
+        this.expiryPlc = expiryPlc;
         this.filter = filter;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
@@ -240,10 +245,10 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
     }
 
     /**
-     * @return Time to live.
+     * @return Expiry policy.
      */
-    public long ttl() {
-        return ttl;
+    public ExpiryPolicy expiry() {
+        return expiryPlc;
     }
 
     /**
@@ -485,6 +490,9 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
         keyBytes = marshalCollection(keys, ctx);
         valBytes = marshalValuesCollection(vals, ctx);
         filterBytes = marshalFilter(filter, ctx);
+
+        if (expiryPlc != null)
+            expiryPlcBytes = CU.marshal(ctx, new GridCacheExpiryPolicy(expiryPlc));
     }
 
     /** {@inheritDoc} */
@@ -494,6 +502,9 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
         keys = unmarshalCollection(keyBytes, ctx, ldr);
         vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
         filter = unmarshalFilter(filterBytes, ctx, ldr);
+
+        if (expiryPlcBytes != null)
+            expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
     }
 
     /** {@inheritDoc} */
@@ -527,7 +538,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
         _clone.drTtls = drTtls;
         _clone.drExpireTimes = drExpireTimes;
         _clone.retval = retval;
-        _clone.ttl = ttl;
+        _clone.expiryPlc = expiryPlc;
         _clone.filter = filter;
         _clone.filterBytes = filterBytes;
         _clone.hasPrimary = hasPrimary;
@@ -688,7 +699,7 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
                 commState.idx++;
 
             case 15:
-                if (!commState.putLong(ttl))
+                if (!commState.putByteArray(expiryPlcBytes))
                     return false;
 
                 commState.idx++;
@@ -928,10 +939,12 @@ public class GridNearAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> im
                 commState.idx++;
 
             case 15:
-                if (buf.remaining() < 8)
+                byte[] expiryPlcBytes0 = commState.getByteArray();
+
+                if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
                     return false;
 
-                ttl = commState.getLong();
+                expiryPlcBytes = expiryPlcBytes0;
 
                 commState.idx++;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index fe16214..fd4d7dc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -94,10 +94,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
     @GridDirectVersion(1)
     private GridCacheVersion nearVer;
 
-    /** Ttl to be used for originating node's near cache update. */
-    @GridDirectVersion(1)
-    private long nearTtl;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -204,20 +200,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
     }
 
     /**
-     * @param ttl Time to live to be used for originating node's near cache update.
-     */
-    public void nearTtl(long ttl) {
-        nearTtl = ttl;
-    }
-
-    /**
-     * @return Time to live to be used for originating node's near cache update.
-     */
-    public long nearTtl() {
-        return nearTtl;
-    }
-
-    /**
      * @param nearVer Version generated on primary node to be used for originating node's near cache update.
      */
     public void nearVersion(GridCacheVersion nearVer) {
@@ -384,7 +366,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
         _clone.nearVals = nearVals;
         _clone.nearValBytes = nearValBytes;
         _clone.nearVer = nearVer;
-        _clone.nearTtl = nearTtl;
     }
 
     /** {@inheritDoc} */
@@ -461,12 +442,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
                 commState.idx++;
 
             case 9:
-                if (!commState.putLong(nearTtl))
-                    return false;
-
-                commState.idx++;
-
-            case 10:
                 if (nearValBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(nearValBytes.size()))
@@ -493,7 +468,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
 
                 commState.idx++;
 
-            case 11:
+            case 10:
                 if (nearValsIdxs != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(nearValsIdxs.size()))
@@ -520,7 +495,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
 
                 commState.idx++;
 
-            case 12:
+            case 11:
                 if (!commState.putCacheVersion(nearVer))
                     return false;
 
@@ -620,14 +595,6 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
                 commState.idx++;
 
             case 9:
-                if (buf.remaining() < 8)
-                    return false;
-
-                nearTtl = commState.getLong();
-
-                commState.idx++;
-
-            case 10:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -656,7 +623,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
 
                 commState.idx++;
 
-            case 11:
+            case 10:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -685,7 +652,7 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
 
                 commState.idx++;
 
-            case 12:
+            case 11:
                 GridCacheVersion nearVer0 = commState.getCacheVersion();
 
                 if (nearVer0 == CACHE_VER_NOT_READ)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index ce5e19c..2aa32c3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -23,6 +23,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 
@@ -148,7 +149,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
             }
 
             try {
-                processNearAtomicUpdateResponse(ver, key, val, valBytes, res.nearTtl(), req.nodeId(), req.subjectId(),
+                processNearAtomicUpdateResponse(ver,
+                    key,
+                    val,
+                    valBytes,
+                    req.expiry(),
+                    req.nodeId(),
+                    req.subjectId(),
                     taskName);
             }
             catch (IgniteCheckedException e) {
@@ -162,7 +169,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
      * @param key Key.
      * @param val Value.
      * @param valBytes Value bytes.
-     * @param ttl Time to live.
+     * @param expiryPlc Expiry policy.
      * @param nodeId Node ID.
      * @throws IgniteCheckedException If failed.
      */
@@ -171,7 +178,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         K key,
         @Nullable V val,
         @Nullable byte[] valBytes,
-        Long ttl,
+        ExpiryPolicy expiryPlc,
         UUID nodeId,
         UUID subjId,
         String taskName
@@ -196,7 +203,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         valBytes,
                         /*write-through*/false,
                         /*retval*/false,
-                        ttl,
+                        expiryPlc != null ? expiryPlc : ctx.expiry(),
                         /*event*/true,
                         /*metrics*/true,
                         /*primary*/false,
@@ -253,6 +260,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
+        ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
+
         for (int i = 0; i < req.nearSize(); i++) {
             K key = req.nearKey(i);
 
@@ -292,7 +301,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             valBytes,
                             /*write-through*/false,
                             /*retval*/false,
-                            req.ttl(),
+                            expiry,
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
index 88abfc4..ef0b4c3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTest.java
@@ -46,6 +46,8 @@ public class IgniteCacheTest extends GridCommonAbstractTest {
         assert cnt >= 1 : "At least one grid must be started";
 
         startGridsMultiThreaded(cnt);
+
+        awaitPartitionMapExchange();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2a51321/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
index f72619b..0d22f62 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
@@ -10,15 +10,25 @@
 package org.apache.ignite.internal.processors.cache.expiry;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;
+import org.jetbrains.annotations.*;
 
 import javax.cache.configuration.*;
 import javax.cache.expiry.*;
+import java.util.*;
 import java.util.concurrent.*;
 
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
+import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.cache.GridCacheMode.*;
+
 /**
  *
  */
@@ -36,59 +46,82 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
         stopAllGrids();
     }
 
-    @Override
-    protected int gridCount() {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
         return 2;
     }
 
     /**
-     *
+     * @throws Exception If failed.
      */
-    private class TestCreatedPolicy implements ExpiryPolicy {
-        /** */
-        private final Duration duration;
+    public void testCreated() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
 
-        /**
-         * @param ttl TTL for creation.
-         */
-        TestCreatedPolicy(long ttl) {
-            this.duration = new Duration(TimeUnit.MILLISECONDS, ttl);
-        }
+        startGrids();
 
-        /** {@inheritDoc} */
-        @Override public Duration getExpiryForCreation() {
-            return duration;
-        }
+        Collection<Integer> keys = keys();
 
-        /** {@inheritDoc} */
-        @Override public Duration getExpiryForAccess() {
-            return null;
-        }
+        IgniteCache<Integer, Integer> cache = jcache(0);
 
-        /** {@inheritDoc} */
-        @Override public Duration getExpiryForUpdate() {
-            return null;
+        for (final Integer key : keys) {
+            log.info("Test key: " + key);
+
+            cache.put(key, 1);
+
+            checkTtl(key, 60_000);
+
+            for (int i = 0; i < gridCount(); i++) {
+                assertEquals((Integer)1, cache.get(key));
+
+                checkTtl(key, 60_000);
+            }
+
+            cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 2); // Update, should not change TTL.
+
+            checkTtl(key, 60_000);
+
+            assertEquals((Integer)2, cache.get(key));
+
+            assertTrue(cache.remove(key));
+
+            cache.withExpiryPolicy(new TestPolicy(1000L, null, null)).put(key, 3); // Create with provided TTL.
+
+            checkTtl(key, 1000);
+
+            waitExpired(key);
         }
     }
 
     /**
+     * @return Test keys.
      * @throws Exception If failed.
      */
-    public void testCreated() throws Exception {
-        factory = new FactoryBuilder.SingletonFactory<>(new TestCreatedPolicy(1000));
+    private Collection<Integer> keys() throws Exception {
+        GridCache<Integer, Object> cache = cache(0);
 
-        startGrids();
+        Collection<Integer> keys = new ArrayList<>();
 
-        final Integer key = 1;
+        keys.add(primaryKey(cache));
 
-        IgniteCache<Integer, Integer> cache = jcache(0);
+        if (gridCount() > 1) {
+            keys.add(backupKey(cache));
+
+            if (cache.configuration().getDistributionMode() == NEAR_PARTITIONED)
+                keys.add(nearKey(cache));
+        }
 
-        cache.put(1, 1);
+        return keys;
+    }
 
+    /**
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void waitExpired(final Integer key) throws Exception {
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 for (int i = 0; i < gridCount(); i++) {
-                    Object val = cache(i).peek(key);
+                    Object val = jcache(i).localPeek(key);
 
                     log.info("Value [grid=" + i + ", val=" + val + ']');
 
@@ -98,10 +131,50 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
 
                 return false;
             }
-        }, 2000);
+        }, 3000);
+
+        GridCache<Integer, Object> cache = cache(0);
+
+        for (int i = 0; i < gridCount(); i++) {
+            ClusterNode node = grid(i).cluster().localNode();
+
+            Object val = jcache(i).localPeek(key);
+
+            log.info("Value [grid=" + i +
+                ", primary=" + cache.affinity().isPrimary(node, key) +
+                ", backup=" + cache.affinity().isBackup(node, key) + ']');
+
+            assertNull("Unexpected non-null value for grid " + i, val);
+        }
 
         for (int i = 0; i < gridCount(); i++)
-            assertNull("Unexpected non-null value for grid " + i, cache.get(key));
+            assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
+    }
+
+    /**
+     * @param key Key.
+     * @param ttl TTL.
+     * @throws Exception If failed.
+     */
+    private void checkTtl(Object key, long ttl) throws Exception {
+        for (int i = 0; i < gridCount(); i++) {
+            GridKernal grid = (GridKernal)grid(i);
+
+            GridCacheAdapter<Object, Object> cache = grid.context().cache().internalCache();
+
+            GridCacheEntryEx<Object, Object> e = cache.peekEx(key);
+
+            if (e == null && cache.context().isNear())
+                e = cache.context().near().dht().peekEx(key);
+
+            if (e == null) {
+                assertTrue(i > 0);
+
+                assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
+            }
+            else
+                assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
+        }
     }
 
     /** {@inheritDoc} */
@@ -110,11 +183,61 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
 
         GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
 
-        cfg.setCacheMode(GridCacheMode.PARTITIONED);
-        cfg.setAtomicityMode(GridCacheAtomicityMode.ATOMIC);
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(ATOMIC);
+        cfg.setBackups(1);
+
+        cfg.setDistributionMode(PARTITIONED_ONLY);
 
         cfg.setExpiryPolicyFactory(factory);
 
         return cfg;
     }
+
+    /**
+     *
+     */
+    private class TestPolicy implements ExpiryPolicy {
+        /** */
+        private Long create;
+
+        /** */
+        private Long access;
+
+        /** */
+        private Long update;
+
+        /**
+         * @param create TTL for creation.
+         * @param access TTL for access.
+         * @param update TTL for update.
+         */
+        TestPolicy(@Nullable Long create,
+           @Nullable Long access,
+           @Nullable Long update) {
+            this.create = create;
+            this.access = access;
+            this.update = update;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForCreation() {
+            return create != null ? new Duration(TimeUnit.MILLISECONDS, create) : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForAccess() {
+            return access != null ? new Duration(TimeUnit.MILLISECONDS, access) : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForUpdate() {
+            return update != null ? new Duration(TimeUnit.MILLISECONDS, update) : null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestPolicy.class, this);
+        }
+    }
 }


Mime
View raw message