ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [12/38] incubator-ignite git commit: # ignite-41
Date Tue, 23 Dec 2014 08:22:48 GMT
# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5f95bd2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5f95bd2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5f95bd2e

Branch: refs/heads/ignite-1
Commit: 5f95bd2e5eb0227f1e329feb8b0c27ff3184f8c2
Parents: 18b5b5a
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Dec 18 13:42:29 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Dec 18 14:58:50 2014 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  |   3 +-
 .../cache/GridCacheAccessExpiryPolicy.java      |  35 +++-
 .../processors/cache/GridCacheAdapter.java      |  33 +++-
 .../processors/cache/GridCacheMapEntry.java     |   2 -
 .../processors/cache/GridCacheTxHandler.java    |  17 +-
 .../cache/GridCacheTxLocalAdapter.java          |   8 -
 .../GridDistributedTxRemoteAdapter.java         |  39 -----
 .../distributed/dht/GridDhtCacheAdapter.java    | 162 ++++++++++++++-----
 .../cache/distributed/dht/GridDhtGetFuture.java |  70 ++++++--
 .../distributed/dht/GridDhtTxFinishFuture.java  |  21 +++
 .../distributed/dht/GridDhtTxFinishRequest.java |  95 +++++++++++
 .../cache/distributed/dht/GridDhtTxRemote.java  |  18 ++-
 .../dht/GridPartitionedGetFuture.java           |  23 ++-
 .../dht/atomic/GridDhtAtomicCache.java          |  35 ++--
 .../dht/colocated/GridDhtColocatedCache.java    |  45 ++++--
 .../distributed/near/GridNearCacheAdapter.java  |  16 +-
 .../distributed/near/GridNearGetFuture.java     |  39 +++--
 .../distributed/near/GridNearGetRequest.java    |  10 ++
 .../near/GridNearTransactionalCache.java        |  18 ++-
 .../cache/distributed/near/GridNearTxLocal.java |   2 +-
 .../near/GridNearTxPrepareRequest.java          |   7 +
 .../distributed/near/GridNearTxRemote.java      |   8 +-
 .../local/atomic/GridLocalAtomicCache.java      |  18 ++-
 .../cache/IgniteCacheAbstractTest.java          |  15 ++
 .../IgniteCacheAtomicExpiryPolicyTest.java      |   6 +
 ...AtomicPrimaryWriteOrderExpiryPolicyTest.java |  24 +++
 .../IgniteCacheExpiryPolicyAbstractTest.java    | 102 +++++++++---
 .../IgniteCacheExpiryPolicyTestSuite.java       |   1 +
 .../bamboo/GridDataGridTestSuite.java           |   4 +-
 29 files changed, 664 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9ba7c45..87086a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -163,7 +163,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final int DFLT_PORT = 47100;
 
     /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */
-    public static final int DFLT_SHMEM_PORT = 48100;
+    // FIXME IGNITE-41.
+    public static final int DFLT_SHMEM_PORT = -1;
 
     /** Default idle connection timeout (value is <tt>30000</tt>ms). */
     public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
index 0b6152d..07f4ae8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
@@ -9,11 +9,13 @@
 
 package org.gridgain.grid.kernal.processors.cache;
 
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.lang.*;
 import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
+import java.util.*;
 
 /**
  *
@@ -23,7 +25,7 @@ public class GridCacheAccessExpiryPolicy {
     private final long ttl;
 
     /** */
-    private GridCacheTtlUpdateRequest req;
+    private volatile Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries;
 
     /**
      * @param expiryPlc Expiry policy.
@@ -58,24 +60,41 @@ public class GridCacheAccessExpiryPolicy {
     }
 
     /**
+     *
+     */
+    public void reset() {
+        Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries;
+
+        if (entries0 != null)
+            entries0.clear();
+    }
+
+    /**
      * @param key Entry key.
      * @param keyBytes Entry key bytes.
      * @param ver Entry version.
      */
     @SuppressWarnings("unchecked")
     public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
-        if (req == null)
-            req = new GridCacheTtlUpdateRequest(ttl);
+        Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries0 = entries;
+
+        if (entries0 == null) {
+            synchronized (this) {
+                entries0 = entries;
 
-        req.addEntry(key, keyBytes, ver);
+                if (entries0 == null)
+                    entries0 = entries = new ConcurrentHashMap8<>();
+            }
+        }
+
+        entries0.put(key, new IgniteBiTuple<>(keyBytes, ver));
     }
 
     /**
      * @return TTL update request.
      */
-    @SuppressWarnings("unchecked")
-    @Nullable public <K, V> GridCacheTtlUpdateRequest<K, V> request() {
-        return (GridCacheTtlUpdateRequest<K, V>)req;
+    @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries() {
+        return entries;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 1f40a7c..a4ec90f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -1737,16 +1737,37 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
         boolean deserializePortable,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter
     ) {
-        subjId = ctx.subjectIdPerCall(subjId);
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
-        return getAllAsync(keys, entry, !skipTx, subjId, taskName, deserializePortable, forcePrimary, filter);
+        subjId = ctx.subjectIdPerCall(subjId, prj);
+
+        ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+
+        if (expiryPlc == null)
+            expiryPlc = ctx.expiry();
+
+        return getAllAsync(keys,
+            entry,
+            !skipTx,
+            subjId,
+            taskName,
+            deserializePortable,
+            forcePrimary,
+            GridCacheAccessExpiryPolicy.forPolicy(expiryPlc),
+            filter);
     }
 
     /** {@inheritDoc} */
     public IgniteFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached, boolean checkTx, @Nullable final UUID subjId, final String taskName,
-        final boolean deserializePortable, final boolean forcePrimary,
-        @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter) {
+        @Nullable GridCacheEntryEx<K, V> cached,
+        boolean checkTx,
+        @Nullable final UUID subjId,
+        final String taskName,
+        final boolean deserializePortable,
+        final boolean forcePrimary,
+        @Nullable GridCacheAccessExpiryPolicy expiry,
+        @Nullable final IgnitePredicate<GridCacheEntry<K, V>>... filter
+        ) {
         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
 
         ctx.denyOnFlag(LOCAL);
@@ -1813,7 +1834,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                                 null,
                                 taskName,
                                 filter,
-                                null);
+                                expiry);
 
                             GridCacheVersion ver = entry.version();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index e4ccc11..c191dda 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -1127,8 +1127,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         @Nullable UUID subjId,
         String taskName
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
-       // log.info("Inner set " + key + " " + val + " " + ttl);
-
         V old;
 
         boolean valid = valid(tx != null ? tx.topologyVersion() : topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
index fa85566..170d7a3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
@@ -16,6 +16,7 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.future.*;
 import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.typedef.*;
@@ -759,10 +760,10 @@ public class GridCacheTxHandler<K, V> {
         if (nearTx != null && nearTx.local())
             nearTx = null;
 
-        finish(nodeId, dhtTx, req, req.writes());
+        finish(nodeId, dhtTx, req, req.writes(), req.ttls());
 
         if (nearTx != null)
-            finish(nodeId, nearTx, req, req.nearWrites());
+            finish(nodeId, nearTx, req, req.nearWrites(), req.nearTtls());
 
         sendReply(nodeId, req);
     }
@@ -772,12 +773,14 @@ public class GridCacheTxHandler<K, V> {
      * @param tx Transaction.
      * @param req Request.
      * @param writes Writes.
+     * @param ttls TTLs for optimistic transaction.
      */
     protected void finish(
         UUID nodeId,
         GridCacheTxRemoteEx<K, V> tx,
         GridDhtTxFinishRequest<K, V> req,
-        Collection<GridCacheTxEntry<K, V>> writes) {
+        Collection<GridCacheTxEntry<K, V>> writes,
+        @Nullable GridLongList ttls) {
         // We don't allow explicit locks for transactions and
         // therefore immediately return if transaction is null.
         // However, we may decide to relax this restriction in
@@ -799,6 +802,8 @@ public class GridCacheTxHandler<K, V> {
             log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req +
                 ", tx=" + tx + ']');
 
+        assert ttls == null || tx.concurrency() == OPTIMISTIC;
+
         try {
             if (req.commit() || req.isSystemInvalidate()) {
                 if (tx.commitVersion(req.commitVersion())) {
@@ -819,6 +824,12 @@ public class GridCacheTxHandler<K, V> {
                                     entry + ", tx=" + tx + ']');
                         }
                     }
+                    else if (tx.concurrency() == OPTIMISTIC && ttls != null) {
+                        int idx = 0;
+
+                        for (GridCacheTxEntry<K, V> e : tx.writeEntries())
+                            e.ttl(ttls.get(idx));
+                    }
 
                     // Complete remote candidates.
                     tx.doneRemote(req.baseVersion(), null, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 1c96b32..479948c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -586,8 +586,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
             addGroupTxMapping(writeSet());
 
         if (!empty) {
-            log.info("User commit");
-
             // We are holding transaction-level locks for entries here, so we can get next write version.
             writeVersion(cctx.versions().next(topologyVersion()));
 
@@ -666,10 +664,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                         }
                                     }
 
-                                    // Preserve TTL if needed.
-                                    if (txEntry.ttl() < 0)
-                                        txEntry.ttl(cached.ttl());
-
                                     // Deal with DR conflicts.
                                     GridCacheVersion explicitVer = txEntry.drVersion() != null ?
                                         txEntry.drVersion() : writeVersion();
@@ -2736,8 +2730,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
             old.cached(entry, old.keyBytes());
             old.filters(filter);
 
-            long ttl = -1L;
-
             // Update ttl if specified.
             if (drTtl >= 0L) {
                 assert drExpireTime >= 0L;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3b34552..5fdafe9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -293,43 +293,6 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
     }
 
     /**
-     * @param key Key to add to read set.
-     * @param keyBytes Key bytes.
-     * @param drVer Data center replication version.
-     */
-    public void addRead(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key, byte[] keyBytes, @Nullable GridCacheVersion drVer) {
-        checkInternal(key);
-
-        GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, READ, null, 0L, -1L,
-            cacheCtx.cache().entryEx(key.key()), drVer);
-
-        txEntry.keyBytes(keyBytes);
-
-        readMap.put(key, txEntry);
-    }
-
-    /**
-     * @param key Key to add to write set.
-     * @param keyBytes Key bytes.
-     * @param op Cache operation.
-     * @param val Write value.
-     * @param valBytes Write value bytes.
-     * @param drVer Data center replication version.
-     */
-    public void addWrite(GridCacheContext<K, V> cacheCtx, GridCacheTxKey<K> key, byte[] keyBytes, GridCacheOperation op, V val, byte[] valBytes,
-        @Nullable GridCacheVersion drVer) {
-        checkInternal(key);
-
-        GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L,
-            cacheCtx.cache().entryEx(key.key()), drVer);
-
-        txEntry.keyBytes(keyBytes);
-        txEntry.valueBytes(valBytes);
-
-        writeMap.put(key, txEntry);
-    }
-
-    /**
      * @param e Transaction entry to set.
      * @return {@code True} if value was set.
      */
@@ -446,8 +409,6 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
     @SuppressWarnings({"CatchGenericClass"})
     private void commitIfLocked() throws IgniteCheckedException {
         if (state() == COMMITTING) {
-            log.info("commitIfLocked");
-
             for (GridCacheTxEntry<K, V> txEntry : writeMap.values()) {
                 assert txEntry != null : "Missing transaction entry for tx: " + this;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 328a8d5..fef3b82 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -10,6 +10,7 @@
 package org.gridgain.grid.kernal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
@@ -102,33 +103,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         });
     }
 
-    /**
-     * @param req Request.
-     */
-    private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
-        int size = req.keys().size();
-
-        for (int i = 0; i < size; i++) {
-            try {
-                GridCacheEntryEx<K, V> entry;
-
-                if (ctx.isSwapOrOffheapEnabled()) {
-                    entry = ctx.cache().entryEx(req.key(i), true);
-
-                    entry.unswap(true, false);
-                }
-                else
-                    entry = ctx.cache().peekEx(req.key(i));
-
-                if (entry != null)
-                    entry.updateTtl(req.version(i), req.ttl());
-            }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to unswap entry.", e);
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void stop() {
         super.stop();
@@ -454,7 +428,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /**
      * This method is used internally. Use
-     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, org.apache.ignite.lang.IgnitePredicate[])}
+     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, long, UUID, int, boolean, IgnitePredicate[], GridCacheAccessExpiryPolicy)}
      * method instead to retrieve DHT value.
      *
      * @param keys {@inheritDoc}
@@ -473,8 +447,15 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         boolean deserializePortable,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
-        return getAllAsync(keys, null, /*don't check local tx. */false, subjId, taskName, deserializePortable,
-            forcePrimary, filter);
+        return getAllAsync(keys,
+            null,
+            /*don't check local tx. */false,
+            subjId,
+            taskName,
+            deserializePortable,
+            forcePrimary,
+            null,
+            filter);
     }
 
     /** {@inheritDoc} */
@@ -490,12 +471,28 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /**
      * @param keys Keys to get
-     * @param filter {@inheritDoc}
-     * @return {@inheritDoc}
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param filter Optional filter.
+     * @param expiry Expiry policy.
+     * @return Get future.
      */
-    IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys, @Nullable UUID subjId,
-        String taskName, boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        return getAllAsync(keys, null, /*don't check local tx. */false, subjId, taskName, deserializePortable, false,
+    IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable GridCacheAccessExpiryPolicy expiry
+        ) {
+        return getAllAsync(keys,
+            null,
+            /*don't check local tx. */false,
+            subjId,
+            taskName,
+            deserializePortable,
+            false,
+            expiry,
             filter);
     }
 
@@ -505,7 +502,11 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param keys Keys to get.
      * @param reload Reload flag.
      * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param deserializePortable Deserialize portable flag.
      * @param filter Optional filter.
+     * @param expiry Expiry policy.
      * @return DHT future.
      */
     public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader,
@@ -516,7 +517,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable UUID subjId,
         int taskNameHash,
         boolean deserializePortable,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable GridCacheAccessExpiryPolicy expiry) {
         GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
             msgId,
             reader,
@@ -527,7 +529,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             filter,
             subjId,
             taskNameHash,
-            deserializePortable);
+            deserializePortable,
+            expiry);
 
         fut.init();
 
@@ -541,6 +544,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest<K, V> req) {
         assert isAffinityNode(cacheCfg);
 
+        long ttl = req.accessTtl();
+
+        final GridCacheAccessExpiryPolicy expiryPlc = ttl == -1L ? null : new GridCacheAccessExpiryPolicy(ttl);
+
         IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
             getDhtAsync(nodeId,
                 req.messageId(),
@@ -550,7 +557,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 req.subjectId(),
                 req.taskNameHash(),
                 false,
-                req.filter());
+                req.filter(),
+                expiryPlc);
 
         fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() {
             @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) {
@@ -582,10 +590,88 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                     U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
                         ",req=" + req + ", res=" + res + ']', e);
                 }
+
+                sendTtlUpdateRequest(expiryPlc);
             }
         });
     }
 
+    /**
+     * @param expiryPlc Expiry policy.
+     */
+    protected void sendTtlUpdateRequest(@Nullable final GridCacheAccessExpiryPolicy expiryPlc) {
+        if (expiryPlc != null && expiryPlc.entries() != null) {
+            ctx.closures().runLocalSafe(new Runnable() {
+                @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"})
+                @Override public void run() {
+                    Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries = expiryPlc.entries();
+
+                    assert entries != null && !entries.isEmpty();
+
+                    Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>();
+
+                    long topVer = ctx.discovery().topologyVersion();
+
+                    for (Map.Entry<Object, IgniteBiTuple<byte[], GridCacheVersion>> e : entries.entrySet()) {
+                        List<ClusterNode> nodes = ctx.affinity().nodes((K)e.getKey(), topVer);
+
+                        for (int i = 0; i < nodes.size(); i++) {
+                            ClusterNode node = nodes.get(i);
+
+                            if (!node.isLocal()) {
+                                GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node);
+
+                                if (req == null) {
+                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(expiryPlc.ttl()));
+
+                                    req.cacheId(ctx.cacheId());
+                                }
+
+                                req.addEntry((K)e.getKey(), e.getValue().get1(), e.getValue().get2());
+                            }
+                        }
+                    }
+
+                    for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) {
+                        try {
+                            ctx.io().send(req.getKey(), req.getValue());
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Failed to send TTL update request.", e);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * @param req Request.
+     */
+    private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
+        int size = req.keys().size();
+
+        for (int i = 0; i < size; i++) {
+            try {
+                GridCacheEntryEx<K, V> entry;
+
+                if (ctx.isSwapOrOffheapEnabled()) {
+                    entry = ctx.cache().entryEx(req.key(i), true);
+
+                    entry.unswap(true, false);
+                }
+                else
+                    entry = ctx.cache().peekEx(req.key(i));
+
+                if (entry != null)
+                    entry.updateTtl(req.version(i), req.ttl());
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to unswap entry.", e);
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void unlockAll(Collection<? extends K> keys,
         IgnitePredicate<GridCacheEntry<K, V>>[] filter) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 6db3540..2ce4b6c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -83,6 +83,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     /** Whether to deserialize portable objects. */
     private boolean deserializePortable;
 
+    /** Expiry policy. */
+    private GridCacheAccessExpiryPolicy expiryPlc;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -99,6 +102,10 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
      * @param tx Transaction.
      * @param topVer Topology version.
      * @param filters Filters.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
      */
     public GridDhtGetFuture(
         GridCacheContext<K, V> cctx,
@@ -111,7 +118,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean deserializePortable) {
+        boolean deserializePortable,
+        @Nullable GridCacheAccessExpiryPolicy expiryPlc) {
         super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer());
 
         assert reader != null;
@@ -128,6 +136,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         this.subjId = subjId;
         this.deserializePortable = deserializePortable;
         this.taskNameHash = taskNameHash;
+        this.expiryPlc = expiryPlc;
 
         futId = IgniteUuid.randomUuid();
 
@@ -325,11 +334,30 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         IgniteFuture<Map<K, V>> fut;
 
         if (txFut == null || txFut.isDone()) {
-            if (reload && cctx.isStoreEnabled() && cctx.store().configured())
-                fut = cache().reloadAllAsync(keys.keySet(), true, subjId, taskName, filters);
-            else
-                fut = tx == null ? cache().getDhtAllAsync(keys.keySet(), subjId, taskName, deserializePortable, filters) :
-                    tx.getAllAsync(cctx, keys.keySet(), null, deserializePortable, filters);
+            if (reload && cctx.isStoreEnabled() && cctx.store().configured()) {
+                fut = cache().reloadAllAsync(keys.keySet(),
+                    true,
+                    subjId,
+                    taskName,
+                    filters);
+            }
+            else {
+                if (tx == null) {
+                    fut = cache().getDhtAllAsync(keys.keySet(),
+                        subjId,
+                        taskName,
+                        deserializePortable,
+                        filters,
+                        expiryPlc);
+                }
+                else {
+                    fut = tx.getAllAsync(cctx,
+                        keys.keySet(),
+                        null,
+                        deserializePortable,
+                        filters);
+                }
+            }
         }
         else {
             // If we are here, then there were active transactions for some entries
@@ -342,12 +370,30 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         if (e != null)
                             throw new GridClosureException(e);
 
-                        if (reload)
-                            return cache().reloadAllAsync(keys.keySet(), true, subjId, taskName, filters);
-                        else
-                            return tx == null ?
-                                cache().getDhtAllAsync(keys.keySet(), subjId, taskName, deserializePortable, filters) :
-                                tx.getAllAsync(cctx, keys.keySet(), null, deserializePortable, filters);
+                        if (reload && cctx.isStoreEnabled() && cctx.store().configured()) {
+                            return cache().reloadAllAsync(keys.keySet(),
+                                true,
+                                subjId,
+                                taskName,
+                                filters);
+                        }
+                        else {
+                            if (tx == null) {
+                                return cache().getDhtAllAsync(keys.keySet(),
+                                    subjId,
+                                    taskName,
+                                    deserializePortable,
+                                    filters,
+                                    expiryPlc);
+                            }
+                            else {
+                                return tx.getAllAsync(cctx,
+                                    keys.keySet(),
+                                    null,
+                                    deserializePortable,
+                                    filters);
+                            }
+                        }
                     }
                 },
                 cctx.kernalContext());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index a836ff8..ee989a5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -320,6 +320,20 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.subjectId(),
                 tx.taskNameHash());
 
+            if (!tx.pessimistic()) {
+                int idx = 0;
+
+                for (GridCacheTxEntry<K, V> e : dhtMapping.writes())
+                    req.ttl(idx++, e.ttl());
+
+                if (nearMapping != null) {
+                    idx = 0;
+
+                    for (GridCacheTxEntry<K, V> e : nearMapping.writes())
+                        req.nearTtl(idx++, e.ttl());
+                }
+            }
+
             if (tx.onePhaseCommit())
                 req.writeVersion(tx.writeVersion());
 
@@ -377,6 +391,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                     tx.subjectId(),
                     tx.taskNameHash());
 
+                if (!tx.pessimistic()) {
+                    int idx = 0;
+
+                    for (GridCacheTxEntry<K, V> e : nearMapping.writes())
+                        req.nearTtl(idx++, e.ttl());
+                }
+
                 if (tx.onePhaseCommit())
                     req.writeVersion(tx.writeVersion());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c5db862..f4946e9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -15,6 +15,7 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.direct.*;
 import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
@@ -74,6 +75,12 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
     @GridDirectVersion(2)
     private int taskNameHash;
 
+    /** TTLs for optimistic transaction. */
+    private GridLongList ttls;
+
+    /** TTLs for optimistic transaction. */
+    private GridLongList nearTtls;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -93,6 +100,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
      * @param sysInvalidate System invalidation flag.
+     * @param syncCommit Synchronous commit flag.
+     * @param syncRollback Synchronous rollback flag.
      * @param baseVer Base version.
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
@@ -103,6 +112,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
      * @param recoverWrites Recovery write entries.
      * @param onePhaseCommit One phase commit flag.
      * @param grpLockKey Group lock key.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
      */
     public GridDhtTxFinishRequest(
         UUID nearNodeId,
@@ -242,6 +253,56 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
         return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
     }
 
+    /**
+     * @param idx Entry index.
+     * @param ttl TTL.
+     */
+    public void ttl(int idx, long ttl) {
+        if (ttl != -1L) {
+            if (ttls == null) {
+                ttls = new GridLongList();
+
+                for (int i = 0; i < idx - 1; i++)
+                    ttls.add(-1L);
+            }
+        }
+
+        if (ttls != null)
+            ttls.add(ttl);
+    }
+
+    /**
+     * @return TTLs for optimistic transaction.
+     */
+    public GridLongList ttls() {
+        return ttls;
+    }
+
+    /**
+     * @param idx Entry index.
+     * @param ttl TTL.
+     */
+    public void nearTtl(int idx, long ttl) {
+        if (ttl != -1L) {
+            if (nearTtls == null) {
+                nearTtls = new GridLongList();
+
+                for (int i = 0; i < idx - 1; i++)
+                    nearTtls.add(-1L);
+            }
+        }
+
+        if (nearTtls != null)
+            nearTtls.add(ttl);
+    }
+
+    /**
+     * @return TTLs for optimistic transaction.
+     */
+    public GridLongList nearTtls() {
+        return nearTtls;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
@@ -304,6 +365,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
         _clone.writeVer = writeVer;
         _clone.subjId = subjId;
         _clone.taskNameHash = taskNameHash;
+        _clone.ttls = ttls;
+        _clone.nearTtls = nearTtls;
     }
 
     /** {@inheritDoc} */
@@ -430,6 +493,18 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
 
                 commState.idx++;
 
+            case 31:
+                if (!commState.putLongList(ttls))
+                    return false;
+
+                commState.idx++;
+
+            case 32:
+                if (!commState.putLongList(nearTtls))
+                    return false;
+
+                commState.idx++;
+
         }
 
         return true;
@@ -584,6 +659,26 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
 
                 commState.idx++;
 
+            case 31:
+                GridLongList ttls0 = commState.getLongList();
+
+                if (ttls0 == LONG_LIST_NOT_READ)
+                    return false;
+
+                ttls = ttls0;
+
+                commState.idx++;
+
+            case 32:
+                GridLongList nearTtls0 = commState.getLongList();
+
+                if (nearTtls0 == LONG_LIST_NOT_READ)
+                    return false;
+
+                nearTtls = nearTtls0;
+
+                commState.idx++;
+
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 3bc41b2..2ac605b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -276,8 +276,13 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
      * @param drVer Data center replication version.
      * @param clos Transform closures.
      */
-    public void addWrite(GridCacheContext<K, V> cacheCtx, GridCacheOperation op, GridCacheTxKey<K> key, byte[] keyBytes,
-        @Nullable V val, @Nullable byte[] valBytes, @Nullable Collection<IgniteClosure<V, V>> clos,
+    public void addWrite(GridCacheContext<K, V> cacheCtx,
+        GridCacheOperation op,
+        GridCacheTxKey<K> key,
+        byte[] keyBytes,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        @Nullable Collection<IgniteClosure<V, V>> clos,
         @Nullable GridCacheVersion drVer) {
         checkInternal(key);
 
@@ -286,7 +291,14 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
 
         GridDhtCacheEntry<K, V> cached = cacheCtx.dht().entryExx(key.key(), topologyVersion());
 
-        GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached, drVer);
+        GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx,
+            this,
+            op,
+            val,
+            -1L,
+            -1L,
+            cached,
+            drVer);
 
         txEntry.keyBytes(keyBytes);
         txEntry.valueBytes(valBytes);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 34539d4..4a3491d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -88,8 +88,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     /** Whether to deserialize portable objects. */
     private boolean deserializePortable;
 
-    /** */
-    private GridCacheAccessExpiryPolicy expiry;
+    /** Expiry policy. */
+    private GridCacheAccessExpiryPolicy expiryPlc;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -104,8 +104,12 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * @param topVer Topology version.
      * @param reload Reload flag.
      * @param forcePrimary If {@code true} then will force network trip to primary node even
-     *          if called on backup node.
+     *        if called on backup node.
      * @param filters Filters.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
      */
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
@@ -117,7 +121,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        @Nullable GridCacheAccessExpiryPolicy expiry
+        @Nullable GridCacheAccessExpiryPolicy expiryPlc
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
@@ -132,7 +136,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         this.subjId = subjId;
         this.deserializePortable = deserializePortable;
         this.taskName = taskName;
-        this.expiry = expiry;
+        this.expiryPlc = expiryPlc;
 
         futId = IgniteUuid.randomUuid();
 
@@ -233,6 +237,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             if (trackable)
                 cctx.mvcc().removeFuture(this);
 
+            cache().sendTtlUpdateRequest(expiryPlc);
+
             return true;
         }
 
@@ -304,7 +310,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                         subjId,
                         taskName == null ? 0 : taskName.hashCode(),
                         deserializePortable,
-                        filters);
+                        filters,
+                        expiryPlc);
 
                 final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -356,7 +363,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     filters,
                     subjId,
                     taskName == null ? 0 : taskName.hashCode(),
-                    expiry != null ? expiry.ttl() : -1L);
+                    expiryPlc != null ? expiryPlc.ttl() : -1L);
 
                 add(fut); // Append new future.
 
@@ -417,7 +424,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                                 null,
                                 taskName,
                                 filters,
-                                null);
+                                expiryPlc);
 
                             colocated.context().evicts().touch(entry, topVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1d49097..3b07fc0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -275,10 +275,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     false,
                     forcePrimary,
                     filter,
-                    expiryPlc,
                     subjId0,
                     taskName,
-                    deserializePortable);
+                    deserializePortable,
+                    expiryPlc);
             }
         });
     }
@@ -705,20 +705,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param reload Reload flag.
      * @param forcePrimary Force primary flag.
      * @param filter Filter.
-     * @param expiryPlc Expiry policy.
      * @param subjId Subject ID.
      * @param taskName Task name.
      * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
      * @return Get future.
      */
     private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys,
         boolean reload,
         boolean forcePrimary,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable ExpiryPolicy expiryPlc,
         UUID subjId,
         String taskName,
-        boolean deserializePortable) {
+        boolean deserializePortable,
+        @Nullable ExpiryPolicy expiryPlc) {
         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
 
         if (F.isEmpty(keys))
@@ -813,32 +813,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
 
             if (success) {
-                if (expiry != null && expiry.request() != null) {
-                    ctx.closures().runLocalSafe(new Runnable() {
-                        @Override public void run() {
-                            try {
-                                GridCacheTtlUpdateRequest<K, V> req = expiry.request();
-
-                                assert req != null;
-                                assert !F.isEmpty(req.keys());
-
-                                Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1);
-
-                                req.cacheId(ctx.cacheId());
-
-                                ctx.io().safeSend(nodes, req, null);
-                            }
-                            catch (IgniteCheckedException e) {
-                                log.error("Failed to send TTL update request.", e);
-                            }
-                        }
-                    });
-                }
+                sendTtlUpdateRequest(expiry);
 
                 return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
             }
         }
 
+        if (expiry != null)
+            expiry.reset();
+
         // Either reload or not all values are available locally.
         GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
             keys,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 84f3165..6e9f921 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -24,6 +24,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 
@@ -169,9 +170,19 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
         long topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
 
-        subjId = ctx.subjectIdPerCall(subjId);
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
-        return loadAsync(keys, false, forcePrimary, topVer, subjId, taskName, deserializePortable, filter);
+        subjId = ctx.subjectIdPerCall(subjId, prj);
+
+        return loadAsync(keys,
+            false,
+            forcePrimary,
+            topVer,
+            subjId,
+            taskName,
+            deserializePortable,
+            filter,
+            prj != null ? prj.expiry() : null);
     }
 
     /** {@inheritDoc} */
@@ -212,6 +223,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param taskName Task name.
      * @param deserializePortable Deserialize portable flag.
      * @param filter Filter.
+     * @param expiryPlc Expiry policy.
      * @return Loaded values.
      */
     public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys,
@@ -221,13 +233,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable ExpiryPolicy expiryPlc) {
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
         if (keyCheck)
             validateCacheKeys(keys);
 
+        final GridCacheAccessExpiryPolicy expiry =
+            GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+
         // Optimisation: try to resolve value locally and escape 'get future' creation.
         if (!reload && !forcePrimary) {
             Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f);
@@ -258,7 +274,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                 null,
                                 taskName,
                                 filter,
-                                null);
+                                expiry);
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null) {
@@ -306,10 +322,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                     break;
             }
 
-            if (success)
+            if (success) {
+                sendTtlUpdateRequest(expiry);
+
                 return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+            }
         }
 
+        if (expiry != null)
+            expiry.reset();
+
         // Either reload or not all values are available locally.
         GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
             keys,
@@ -320,7 +342,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             subjId,
             taskName,
             deserializePortable,
-            null);
+            expiry);
 
         fut.init();
 
@@ -332,9 +354,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      *
      * {@inheritDoc}
      */
-    @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout,
-        @Nullable GridCacheTxLocalEx<K, V> tx, boolean isInvalidate, boolean isRead, boolean retval,
-        @Nullable GridCacheTxIsolation isolation, IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+    @Override public IgniteFuture<Boolean> lockAllAsync(Collection<? extends K> keys,
+        long timeout,
+        @Nullable GridCacheTxLocalEx<K, V> tx,
+        boolean isInvalidate,
+        boolean isRead,
+        boolean retval,
+        @Nullable GridCacheTxIsolation isolation,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         assert tx == null || tx instanceof GridNearTxLocal;
 
         GridNearTxLocal<K, V> txx = (GridNearTxLocal<K, V>)tx;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
index db1a058..6c2fa8c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -248,9 +248,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @param filter Filter.
      * @return Loaded values.
      */
-    public IgniteFuture<Map<K, V>> loadAsync(@Nullable GridCacheTxEx tx, @Nullable Collection<? extends K> keys,
-        boolean reload, boolean forcePrimary, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
-        @Nullable UUID subjId, String taskName, boolean deserializePortable) {
+    public IgniteFuture<Map<K, V>> loadAsync(@Nullable GridCacheTxEx tx,
+        @Nullable Collection<? extends K> keys,
+        boolean reload,
+        boolean forcePrimary,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
 
@@ -259,6 +264,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
 
         GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null;
 
+        // TODO IGNITE-41.
+
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
             reload,
@@ -267,7 +274,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
             filter,
             subjId,
             taskName,
-            deserializePortable);
+            deserializePortable,
+            null);
 
         // init() will register future for responses if future has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index d23236a..b5d5e29 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
@@ -91,6 +90,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     /** Whether to deserialize portable objects. */
     private boolean deserializePortable;
 
+    /** Expiry policy. */
+    private GridCacheAccessExpiryPolicy expiryPlc;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -106,6 +108,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      *      called on backup node.
      * @param tx Transaction.
      * @param filters Filters.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
      */
     public GridNearGetFuture(
         GridCacheContext<K, V> cctx,
@@ -116,11 +122,11 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters,
         @Nullable UUID subjId,
         String taskName,
-        boolean deserializePortable
+        boolean deserializePortable,
+        @Nullable GridCacheAccessExpiryPolicy expiryPlc
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
-        assert cctx != null;
         assert !F.isEmpty(keys);
 
         this.cctx = cctx;
@@ -132,6 +138,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         this.subjId = subjId;
         this.taskName = taskName;
         this.deserializePortable = deserializePortable;
+        this.expiryPlc = expiryPlc;
 
         futId = IgniteUuid.randomUuid();
 
@@ -292,8 +299,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             // If this is the primary or backup node for the keys.
             if (n.isLocal()) {
                 final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
-                    dht().getDhtAsync(n.id(), -1, mappedKeys, reload, topVer, subjId,
-                        taskName == null ? 0 : taskName.hashCode(), deserializePortable, filters);
+                    dht().getDhtAsync(n.id(),
+                        -1,
+                        mappedKeys,
+                        reload,
+                        topVer,
+                        subjId,
+                        taskName == null ? 0 : taskName.hashCode(),
+                        deserializePortable,
+                        filters,
+                        expiryPlc);
 
                 final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -351,7 +366,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     filters,
                     subjId,
                     taskName == null ? 0 : taskName.hashCode(),
-                    -1L);
+                    expiryPlc != null ? expiryPlc.ttl() : -1L);
 
                 add(fut); // Append new future.
 
@@ -406,7 +421,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         null,
                         taskName,
                         filters,
-                        null);
+                        expiryPlc);
 
                 ClusterNode primary = null;
 
@@ -432,7 +447,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                                 null,
                                 taskName,
                                 filters,
-                                null);
+                                expiryPlc);
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null && isNew && entry.markObsoleteIfEmpty(ver))
@@ -549,10 +564,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * @param keys Keys.
      * @param infos Entry infos.
      * @param savedVers Saved versions.
+     * @param topVer Topology version
      * @return Result map.
      */
-    private Map<K, V> loadEntries(UUID nodeId, Collection<K> keys, Collection<GridCacheEntryInfo<K, V>> infos,
-        Map<K, GridCacheVersion> savedVers, long topVer) {
+    private Map<K, V> loadEntries(UUID nodeId,
+        Collection<K> keys,
+        Collection<GridCacheEntryInfo<K, V>> infos,
+        Map<K, GridCacheVersion> savedVers,
+        long topVer) {
         boolean empty = F.isEmpty(keys);
 
         Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new GridLeanMap<K, V>(keys.size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
index 552012b..b6765fb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -90,6 +90,9 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
      * @param reload Reload flag.
      * @param topVer Topology version.
      * @param filter Filter.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
+     * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged.
      */
     public GridNearGetRequest(
         int cacheId,
@@ -186,6 +189,13 @@ public class GridNearGetRequest<K, V> extends GridCacheMessage<K, V> implements
     }
 
     /**
+     * @return New TTL to set after entry is accessed, -1 to leave unchanged.
+     */
+    public long accessTtl() {
+        return accessTtl;
+    }
+
+    /**
      * @param ctx Cache context.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 9b4d117..c8d90e2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -119,12 +119,22 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
      * @param filter Filter.
      * @return Future.
      */
-    IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx, @Nullable Collection<? extends K> keys,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, boolean deserializePortable) {
+    IgniteFuture<Map<K, V>> txLoadAsync(GridNearTxLocal<K, V> tx,
+        @Nullable Collection<? extends K> keys,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        boolean deserializePortable) {
         assert tx != null;
 
-        GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, false, false, tx, filter,
-            CU.subjectId(tx, ctx.shared()), tx.resolveTaskName(), deserializePortable);
+        GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
+            keys,
+            false,
+            false,
+            tx,
+            filter,
+            CU.subjectId(tx, ctx.shared()),
+            tx.resolveTaskName(),
+            deserializePortable,
+            null);
 
         // init() will register future for responses if it has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
index 2b974e9..13233c2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -286,7 +286,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
         }
         else if (cacheCtx.isColocated()) {
             return cacheCtx.colocated().loadAsync(keys, /*reload*/false, /*force primary*/false, topologyVersion(),
-                CU.subjectId(this, cctx), resolveTaskName(), deserializePortable, null)
+                CU.subjectId(this, cctx), resolveTaskName(), deserializePortable, null, null)
                 .chain(new C1<IgniteFuture<Map<K, V>>, Boolean>() {
                     @Override public Boolean apply(IgniteFuture<Map<K, V>> f) {
                         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 30b7aef..0cc5001 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -75,6 +75,8 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
      * @param txNodes Transaction nodes mapping.
      * @param last {@code True} if this last prepare request for node.
      * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
      */
     public GridNearTxPrepareRequest(
         IgniteUuid futId,
@@ -202,6 +204,11 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
     }
 
     /** {@inheritDoc} */
+    @Override protected boolean transferExpiryPolicy() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
     @Override public GridTcpCommunicationMessageAdapter clone() {
         GridNearTxPrepareRequest _clone = new GridNearTxPrepareRequest();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
index aa3546b..9cc8552 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -344,7 +344,13 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
                     return false;
                 }
                 else {
-                    GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx, this, op, val, 0L, -1L, cached,
+                    GridCacheTxEntry<K, V> txEntry = new GridCacheTxEntry<>(cacheCtx,
+                        this,
+                        op,
+                        val,
+                        -1L,
+                        -1L,
+                        cached,
                         drVer);
 
                     txEntry.keyBytes(keyBytes);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 6eda650..d901311 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -573,16 +573,26 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         boolean deserializePortable) throws IgniteCheckedException {
         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
 
-        UUID subjId = ctx.subjectIdPerCall(null);
-
         if (F.isEmpty(keys))
             return Collections.emptyMap();
 
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+        UUID subjId = ctx.subjectIdPerCall(null, prj);
+
+        ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+
+        if (expiryPlc == null)
+            expiryPlc = ctx.expiry();
+
         Map<K, V> vals = new HashMap<>(keys.size(), 1.0f);
 
         if (keyCheck)
             validateCacheKeys(keys);
 
+        final GridCacheAccessExpiryPolicy expiry =
+            GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+
         boolean success = true;
 
         for (K key : keys) {
@@ -608,7 +618,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                             null,
                             taskName,
                             filter,
-                            null);
+                            expiry);
 
                         if (v != null)
                             vals.put(key, v);
@@ -653,7 +663,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             return map;
         }
 
-        return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, filter).get();
+        return getAllAsync(keys, null, false, subjId, taskName, deserializePortable, false, expiry, filter).get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index 9293431..e1e54a6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -18,6 +18,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.testframework.junits.common.*;
 
+import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
 import static org.gridgain.grid.cache.GridCacheMode.*;
 import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*;
 
@@ -89,6 +90,13 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
         cfg.setSwapEnabled(swapEnabled());
         cfg.setCacheMode(cacheMode());
         cfg.setAtomicityMode(atomicityMode());
+
+        if (atomicityMode() == ATOMIC && cacheMode() != LOCAL) {
+            assert atomicWriteOrderMode() != null;
+
+            cfg.setAtomicWriteOrderMode(atomicWriteOrderMode());
+        }
+
         cfg.setWriteSynchronizationMode(writeSynchronization());
         cfg.setDistributionMode(distributionMode());
         cfg.setPortableEnabled(portableEnabled());
@@ -110,6 +118,13 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
     protected abstract GridCacheAtomicityMode atomicityMode();
 
     /**
+     * @return Atomic cache write order mode.
+     */
+    protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return null;
+    }
+
+    /**
      * @return Partitioned mode.
      */
     protected abstract GridCacheDistributionMode distributionMode();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
index 251bc05..9e7dff5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicExpiryPolicyTest.java
@@ -11,6 +11,7 @@ package org.apache.ignite.internal.processors.cache.expiry;
 
 import org.gridgain.grid.cache.*;
 
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
 import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
 import static org.gridgain.grid.cache.GridCacheDistributionMode.*;
 import static org.gridgain.grid.cache.GridCacheMode.*;
@@ -35,6 +36,11 @@ public class IgniteCacheAtomicExpiryPolicyTest extends IgniteCacheExpiryPolicyAb
     }
 
     /** {@inheritDoc} */
+    @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return CLOCK;
+    }
+
+    /** {@inheritDoc} */
     @Override protected GridCacheDistributionMode distributionMode() {
         return PARTITIONED_ONLY;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5f95bd2e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java
new file mode 100644
index 0000000..dec3a05
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest.java
@@ -0,0 +1,24 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.gridgain.grid.cache.*;
+
+import static org.gridgain.grid.cache.GridCacheAtomicWriteOrderMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicPrimaryWriteOrderExpiryPolicyTest extends IgniteCacheAtomicExpiryPolicyTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return PRIMARY;
+    }
+}


Mime
View raw message