ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [5/5] incubator-ignite git commit: # ignite-41
Date Wed, 17 Dec 2014 15:12:01 GMT
# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/76429793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/76429793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/76429793

Branch: refs/heads/ignite-41
Commit: 764297932db43970d4cdcdb54475064bb76d44a1
Parents: 91227df
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 17 18:11:41 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 17 18:11:41 2014 +0300

----------------------------------------------------------------------
 .../cache/GridCacheAccessExpiryPolicy.java      |  85 ++++++
 .../processors/cache/GridCacheAdapter.java      |   3 +-
 .../processors/cache/GridCacheContext.java      |  11 +-
 .../processors/cache/GridCacheEntryEx.java      |   6 +-
 .../processors/cache/GridCacheMapEntry.java     |  39 ++-
 .../processors/cache/GridCacheTxAdapter.java    |   3 +-
 .../cache/GridCacheTxLocalAdapter.java          |  19 +-
 .../distributed/GridCacheTtlUpdateRequest.java  | 285 +++++++++++++++++++
 .../distributed/dht/GridDhtCacheAdapter.java    |  58 +++-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   3 +-
 .../dht/GridPartitionedGetFuture.java           |  14 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  83 +++++-
 .../dht/colocated/GridDhtColocatedCache.java    |  27 +-
 .../distributed/near/GridNearCacheAdapter.java  |  11 +-
 .../distributed/near/GridNearGetFuture.java     |  10 +-
 .../local/atomic/GridLocalAtomicCache.java      |  12 +-
 .../GridTcpCommunicationMessageFactory.java     |   7 +-
 .../IgniteCacheExpiryPolicyAbstractTest.java    | 110 +++----
 .../IgniteCacheExpiryPolicyTestSuite.java       |   2 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   3 +-
 20 files changed, 649 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
new file mode 100644
index 0000000..0b6152d
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAccessExpiryPolicy.java
@@ -0,0 +1,85 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache;
+
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.expiry.*;
+
+/**
+ *
+ */
+public class GridCacheAccessExpiryPolicy {
+    /** */
+    private final long ttl;
+
+    /** */
+    private GridCacheTtlUpdateRequest req;
+
+    /**
+     * @param expiryPlc Expiry policy.
+     * @return Access expire policy.
+     */
+    public static GridCacheAccessExpiryPolicy forPolicy(@Nullable ExpiryPolicy expiryPlc) {
+        if (expiryPlc == null)
+            return null;
+
+        Duration duration = expiryPlc.getExpiryForAccess();
+
+        if (duration == null)
+            return null;
+
+        return new GridCacheAccessExpiryPolicy(GridCacheMapEntry.toTtl(duration));
+    }
+
+    /**
+     * @param ttl TTL for access.
+     */
+    public GridCacheAccessExpiryPolicy(long ttl) {
+        assert ttl >= 0 : ttl;
+
+        this.ttl = ttl;
+    }
+
+    /**
+     * @return TTL.
+     */
+    public long ttl() {
+        return ttl;
+    }
+
+    /**
+     * @param key Entry key.
+     * @param keyBytes Entry key bytes.
+     * @param ver Entry version.
+     */
+    @SuppressWarnings("unchecked")
+    public void ttlUpdated(Object key, byte[] keyBytes, GridCacheVersion ver) {
+        if (req == null)
+            req = new GridCacheTtlUpdateRequest(ttl);
+
+        req.addEntry(key, keyBytes, ver);
+    }
+
+    /**
+     * @return TTL update request.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public <K, V> GridCacheTtlUpdateRequest<K, V> request() {
+        return (GridCacheTtlUpdateRequest<K, V>)req;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheAccessExpiryPolicy.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 39b7338..1f40a7c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -1812,7 +1812,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                                 subjId,
                                 null,
                                 taskName,
-                                filter);
+                                filter,
+                                null);
 
                             GridCacheVersion ver = entry.version();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index 931e243..cb4337e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -1090,8 +1090,17 @@ public class GridCacheContext<K, V> implements Externalizable {
         if (subjId != null)
             return subjId;
 
-        GridCacheProjectionImpl<K, V> prj = projectionPerCall();
+        return subjectIdPerCall(subjId, projectionPerCall());
+    }
 
+    /**
+     * Gets subject ID per call.
+     *
+     * @param subjId Optional already existing subject ID.
+     * @param prj Optional thread local projection.
+     * @return Subject ID per call.
+     */
+    public UUID subjectIdPerCall(@Nullable UUID subjId, @Nullable GridCacheProjectionImpl<K, V> prj) {
         if (prj != null)
             subjId = prj.subjectId();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
index 76d73b3..2b38247 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheEntryEx.java
@@ -270,10 +270,11 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
      *        temporary object can used for filter evaluation or transform closure execution and
      *        should not be returned to user.
      * @param subjId Subject ID initiated this read.
+     * @param transformClo Transform closure to record event.
      * @param taskName Task name.
      * @param filter Filter to check prior to getting the value. Note that filter check
      *      together with getting the value is an atomic operation.
-     * @param transformClo Transform closure to record event.
+     * @param expiryPlc Expiry policy.
      * @return Cached value.
      * @throws IgniteCheckedException If loading value failed.
      * @throws GridCacheEntryRemovedException If entry was removed.
@@ -290,7 +291,8 @@ public interface GridCacheEntryEx<K, V> extends GridMetadataAware {
         UUID subjId,
         Object transformClo,
         String taskName,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter)
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable GridCacheAccessExpiryPolicy expiryPlc)
         throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 7f9ff4d..00f7382 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -699,7 +699,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         UUID subjId,
         Object transformClo,
         String taskName,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter)
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable GridCacheAccessExpiryPolicy expirePlc)
         throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
         cctx.denyOnFlag(LOCAL);
 
@@ -714,7 +715,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             subjId,
             transformClo,
             taskName,
-            filter);
+            filter,
+            expirePlc);
     }
 
     /** {@inheritDoc} */
@@ -730,7 +732,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         UUID subjId,
         Object transformClo,
         String taskName,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter)
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable GridCacheAccessExpiryPolicy expiryPlc)
         throws IgniteCheckedException, GridCacheEntryRemovedException, GridCacheFilterFailedException {
         // Disable read-through if there is no store.
         if (readThrough && !cctx.isStoreEnabled())
@@ -877,6 +880,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 // No more notifications.
                 evt = false;
             }
+
+            if (ret != null && expiryPlc != null) {
+                long ttl = expiryPlc.ttl();
+
+                assert ttl >= 0 : ttl;
+
+                updateTtl(ttl);
+
+                expiryPlc.ttlUpdated(key(), getOrMarshalKeyBytes(), version());
+            }
         }
 
         if (asyncRefresh && !readThrough && cctx.isStoreEnabled()) {
@@ -906,7 +919,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 subjId,
                 transformClo,
                 taskName,
-                filter);
+                filter,
+                expiryPlc);
         }
 
         boolean loadedFromStore = false;
@@ -987,7 +1001,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             subjId,
             transformClo,
             taskName,
-            filter);
+            filter,
+            expiryPlc);
     }
 
     /** {@inheritDoc} */
@@ -1658,6 +1673,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         return toTtl(duration);
     }
 
+    /**
+     * @param duration Duration.
+     * @return TTL.
+     */
     public static long toTtl(Duration duration) {
         if (duration == null)
             return -1;
@@ -1826,17 +1845,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 boolean pass = cctx.isAll(wrapFilterLocked(), filter);
 
                 if (!pass) {
-                    if (!isNew() && expiryPlc != null) {
+                    if (hasValueUnlocked() && expiryPlc != null) {
                         Duration duration = expiryPlc.getExpiryForAccess();
 
-                        if (duration != null)
-                            updateTtl(toTtl(duration));
+                        newTtl = toTtl(duration);
+
+                        if (newTtl != -1L)
+                            updateTtl(newTtl);
                     }
 
                     return new GridCacheUpdateAtomicResult<>(false,
                         retval ? old : null,
                         null,
-                        0L,
+                        newTtl,
                         -1L,
                         null,
                         null,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
index 7a32afa..edf1e92 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
@@ -1166,7 +1166,8 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
                         /*subjId*/subjId,
                         /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null,
                         resolveTaskName(),
-                        CU.<K, V>empty());
+                        CU.<K, V>empty(),
+                        null);
 
                 try {
                     for (IgniteClosure<V, V> clos : txEntry.transformClosures())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 7b6f266..1c96b32 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -663,8 +663,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                                 expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
 
                                             txEntry.ttl(GridCacheMapEntry.toTtl(duration));
-
-                                            log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", plc=" + expiry);
                                         }
                                     }
 
@@ -1081,7 +1079,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                     CU.subjectId(this, cctx),
                                     transformClo,
                                     resolveTaskName(),
-                                    filter);
+                                    filter,
+                                    null);
 
                                 if (val != null) {
                                     if (!readCommitted())
@@ -1155,7 +1154,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                 CU.subjectId(this, cctx),
                                 null,
                                 resolveTaskName(),
-                                filter);
+                                filter,
+                                null);
 
                             if (val != null) {
                                 V val0 = val;
@@ -1501,7 +1501,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                         CU.subjectId(GridCacheTxLocalAdapter.this, cctx),
                                         transformClo,
                                         resolveTaskName(),
-                                        filter);
+                                        filter,
+                                        null);
 
                                     // If value is in cache and passed the filter.
                                     if (val != null) {
@@ -1844,7 +1845,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
 
                             if (optimistic()) {
                                 try {
-                                    //Should read through if filter is specified.
+                                    // Should read through if filter is specified.
                                     old = entry.innerGet(this,
                                         /*swap*/false,
                                         /*read-through*/readThrough,
@@ -1856,7 +1857,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                         CU.subjectId(this, cctx),
                                         transformClo,
                                         resolveTaskName(),
-                                        CU.<K, V>empty());
+                                        CU.<K, V>empty(),
+                                        null);
                                 }
                                 catch (GridCacheFilterFailedException e) {
                                     e.printStackTrace();
@@ -2065,7 +2067,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                         CU.subjectId(this, cctx),
                                         null,
                                         resolveTaskName(),
-                                        CU.<K, V>empty());
+                                        CU.<K, V>empty(),
+                                        null);
                             }
                             catch (GridCacheFilterFailedException e) {
                                 e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
new file mode 100644
index 0000000..71e314e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -0,0 +1,285 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.direct.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridCacheTtlUpdateRequest<K, V> extends GridCacheMessage<K, V> {
+    /** */
+    @GridDirectCollection(byte[].class)
+    private List<byte[]> keysBytes;
+
+    /** Entry keys. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private List<K> keys;
+
+    /** Entry versions. */
+    @GridDirectCollection(GridCacheVersion.class)
+    private List<GridCacheVersion> vers;
+
+    /** New TTL. */
+    private long ttl;
+
+    /**
+     * Required empty constructor.
+     */
+    public GridCacheTtlUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param ttl TTL.
+     */
+    public GridCacheTtlUpdateRequest(long ttl) {
+        assert ttl >= 0 : ttl;
+
+        this.ttl = ttl;
+    }
+
+    /**
+     * @param key Key.
+     * @param keyBytes Key bytes.
+     * @param ver Version.
+     */
+    public void addEntry(K key, byte[] keyBytes, GridCacheVersion ver) {
+        if (keys == null) {
+            keys = new ArrayList<>();
+
+            keysBytes = new ArrayList<>();
+
+            vers = new ArrayList<>();
+        }
+
+        keys.add(key);
+
+        keysBytes.add(keyBytes);
+
+        vers.add(ver);
+    }
+
+    /**
+     * @return Keys.
+     */
+    public List<K> keys() {
+        return keys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr)
+        throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (keys == null && keysBytes != null)
+            keys = unmarshalCollection(keysBytes, ctx, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 82;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("CloneDoesntCallSuperClone")
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridCacheTtlUpdateRequest _clone = new GridCacheTtlUpdateRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 3:
+                if (keysBytes != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(keysBytes.size()))
+                            return false;
+
+                        commState.it = keysBytes.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putByteArray((byte[])commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+            case 4:
+                if (!commState.putLong(ttl))
+                    return false;
+
+                commState.idx++;
+
+            case 5:
+                if (vers != null) {
+                    if (commState.it == null) {
+                        if (!commState.putInt(vers.size()))
+                            return false;
+
+                        commState.it = vers.iterator();
+                    }
+
+                    while (commState.it.hasNext() || commState.cur != NULL) {
+                        if (commState.cur == NULL)
+                            commState.cur = commState.it.next();
+
+                        if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
+                            return false;
+
+                        commState.cur = NULL;
+                    }
+
+                    commState.it = null;
+                } else {
+                    if (!commState.putInt(-1))
+                        return false;
+                }
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 3:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (keysBytes == null)
+                        keysBytes = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; i++) {
+                        byte[] _val = commState.getByteArray();
+
+                        if (_val == BYTE_ARR_NOT_READ)
+                            return false;
+
+                        keysBytes.add((byte[])_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+            case 4:
+                if (buf.remaining() < 8)
+                    return false;
+
+                ttl = commState.getLong();
+
+                commState.idx++;
+
+            case 5:
+                if (commState.readSize == -1) {
+                    if (buf.remaining() < 4)
+                        return false;
+
+                    commState.readSize = commState.getInt();
+                }
+
+                if (commState.readSize >= 0) {
+                    if (vers == null)
+                        vers = new ArrayList<>(commState.readSize);
+
+                    for (int i = commState.readItems; i < commState.readSize; i++) {
+                        GridCacheVersion _val = commState.getCacheVersion();
+
+                        if (_val == CACHE_VER_NOT_READ)
+                            return false;
+
+                        vers.add((GridCacheVersion)_val);
+
+                        commState.readItems++;
+                    }
+                }
+
+                commState.readSize = -1;
+                commState.readItems = 0;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridCacheTtlUpdateRequest _clone = (GridCacheTtlUpdateRequest)_msg;
+
+        _clone.keysBytes = keysBytes;
+        _clone.keys = keys;
+        _clone.vers = vers;
+        _clone.ttl = ttl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheTtlUpdateRequest.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index c13f8f4..3557d17 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -92,6 +92,24 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        super.start();
+
+        ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest<K, V> req) {
+                processTtlUpdateRequest(req);
+            }
+        });
+    }
+
+    /**
+     * @param req Request.
+     */
+    private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
+        log.info("Ttl update: " + req);
+    }
+
+    /** {@inheritDoc} */
     @Override public void stop() {
         super.stop();
 
@@ -470,11 +488,26 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param filter Optional filter.
      * @return DHT future.
      */
-    public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader, long msgId,
-        LinkedHashMap<? extends K, Boolean> keys, boolean reload, long topVer, @Nullable UUID subjId,
-        int taskNameHash, boolean deserializePortable, IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, msgId, reader, keys, reload, /*tx*/null,
-            topVer, filter, subjId, taskNameHash, deserializePortable);
+    public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader,
+        long msgId,
+        LinkedHashMap<? extends K, Boolean> keys,
+        boolean reload,
+        long topVer,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean deserializePortable,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
+            msgId,
+            reader,
+            keys,
+            reload,
+            /*tx*/null,
+            topVer,
+            filter,
+            subjId,
+            taskNameHash,
+            deserializePortable);
 
         fut.init();
 
@@ -489,13 +522,22 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         assert isAffinityNode(cacheCfg);
 
         IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
-            getDhtAsync(nodeId, req.messageId(), req.keys(), req.reload(), req.topologyVersion(), req.subjectId(),
-                req.taskNameHash(), false, req.filter());
+            getDhtAsync(nodeId,
+                req.messageId(),
+                req.keys(),
+                req.reload(),
+                req.topologyVersion(),
+                req.subjectId(),
+                req.taskNameHash(),
+                false,
+                req.filter());
 
         fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() {
             @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) {
                 GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(),
-                    req.futureId(), req.miniId(), req.version());
+                    req.futureId(),
+                    req.miniId(),
+                    req.version());
 
                 GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
                     (GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>>)f;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 7cee7d9..dfbe4fa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -971,7 +971,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                             CU.subjectId(tx, ctx.shared()),
                                             null,
                                             tx != null ? tx.resolveTaskName() : null,
-                                            CU.<K, V>empty());
+                                            CU.<K, V>empty(),
+                                            null);
 
                                     assert e.lockedBy(mappedVer) ||
                                         (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index f79bc2f..5f6af05 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -293,8 +293,15 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             // If this is the primary or backup node for the keys.
             if (n.isLocal()) {
                 final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
-                    cache().getDhtAsync(n.id(), -1, mappedKeys, reload, topVer, subjId,
-                        taskName == null ? 0 : taskName.hashCode(), deserializePortable, filters);
+                    cache().getDhtAsync(n.id(),
+                        -1,
+                        mappedKeys,
+                        reload,
+                        topVer,
+                        subjId,
+                        taskName == null ? 0 : taskName.hashCode(),
+                        deserializePortable,
+                        filters);
 
                 final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -405,7 +412,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                                 subjId,
                                 null,
                                 taskName,
-                                filters);
+                                filters,
+                                null);
 
                             colocated.context().evicts().touch(entry, topVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d1deed4..6287e16 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -16,6 +16,7 @@ import org.apache.ignite.plugin.security.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.kernal.managers.communication.*;
 import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
@@ -141,6 +142,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @SuppressWarnings({"IfMayBeConditional", "SimplifiableIfStatement"})
     @Override public void start() throws IgniteCheckedException {
+        super.start();
+
         resetMetrics();
 
         preldr = new GridDhtPreloader<>(ctx);
@@ -258,13 +261,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final boolean deserializePortable,
         @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter
     ) {
-        subjId = ctx.subjectIdPerCall(subjId);
+        GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+
+        subjId = ctx.subjectIdPerCall(null, prj);
 
         final UUID subjId0 = subjId;
 
+        final ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+
         return asyncOp(new CO<IgniteFuture<Map<K, V>>>() {
             @Override public IgniteFuture<Map<K, V>> apply() {
-                return getAllAsync0(keys, false, forcePrimary, filter, subjId0, taskName, deserializePortable);
+                return getAllAsync0(keys,
+                    false,
+                    forcePrimary,
+                    filter,
+                    expiryPlc,
+                    subjId0,
+                    taskName,
+                    deserializePortable);
             }
         });
     }
@@ -595,7 +609,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
 
-        UUID subjId = ctx.subjectIdPerCall(null); // TODO IGNITE-41.
+        UUID subjId = ctx.subjectIdPerCall(null, prj);
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
@@ -691,10 +705,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param reload Reload flag.
      * @param forcePrimary Force primary flag.
      * @param filter Filter.
+     * @param expiryPlc Expiry policy.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
      * @return Get future.
      */
-    private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys, boolean reload,
-        boolean forcePrimary, @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter, UUID subjId, String taskName,
+    private IgniteFuture<Map<K, V>> getAllAsync0(@Nullable Collection<? extends K> keys,
+        boolean reload,
+        boolean forcePrimary,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable ExpiryPolicy expiryPlc,
+        UUID subjId,
+        String taskName,
         boolean deserializePortable) {
         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
 
@@ -712,6 +735,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean success = true;
 
+            final GridCacheAccessExpiryPolicy expiry =
+                GridCacheAccessExpiryPolicy.forPolicy(expiryPlc != null ? expiryPlc : ctx.expiry());
+
             // Optimistically expect that all keys are available locally (avoid creation of get future).
             for (K key : keys) {
                 GridCacheEntryEx<K, V> entry = null;
@@ -735,7 +761,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 subjId,
                                 null,
                                 taskName,
-                                filter);
+                                filter,
+                                expiry);
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null) {
@@ -785,13 +812,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     break;
             }
 
-            if (success)
+            if (success) {
+                if (expiry != null && expiry.request() != null) {
+                    ctx.closures().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            try {
+                                GridCacheTtlUpdateRequest<K, V> req = expiry.request();
+
+                                assert !F.isEmpty(req.keys());
+
+                                Collection<ClusterNode> nodes = ctx.affinity().remoteNodes(req.keys(), -1);
+
+                                req.cacheId(ctx.cacheId());
+
+                                ctx.io().safeSend(nodes, req, null);
+                            }
+                            catch (IgniteCheckedException e) {
+                                log.error("Failed to send TTL update request.", e);
+                            }
+                        }
+                    });
+                }
+
                 return ctx.wrapCloneMap(new GridFinishedFuture<>(ctx.kernalContext(), locVals));
+            }
         }
 
         // Either reload or not all values are available locally.
-        GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, topVer, reload, forcePrimary,
-            filter, subjId, taskName, deserializePortable);
+        GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
+            keys,
+            topVer,
+            reload,
+            forcePrimary,
+            filter,
+            subjId,
+            taskName,
+            deserializePortable);
 
         fut.init();
 
@@ -1066,7 +1122,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         req.subjectId(),
                         transform,
                         taskName,
-                        CU.<K, V>empty());
+                        CU.<K, V>empty(),
+                        null);
 
                     if (transformMap == null)
                         transformMap = new HashMap<>();
@@ -1174,7 +1231,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             req.subjectId(),
                             null,
                             taskName,
-                            CU.<K, V>empty());
+                            CU.<K, V>empty(),
+                            null);
 
                         updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated);
 
@@ -1207,7 +1265,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             req.subjectId(),
                             null,
                             taskName,
-                            CU.<K, V>empty());
+                            CU.<K, V>empty(),
+                            null);
 
                         IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
                             entry.key(), old);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 8e9f808..1052e1d 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -207,11 +207,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param keys Keys to load.
      * @param reload Reload flag.
      * @param forcePrimary Force get from primary node flag.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
      * @param filter Filter.
      * @return Loaded values.
      */
-    public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys, boolean reload,
-        boolean forcePrimary, long topVer, @Nullable UUID subjId, String taskName, boolean deserializePortable,
+    public IgniteFuture<Map<K, V>> loadAsync(@Nullable Collection<? extends K> keys,
+        boolean reload,
+        boolean forcePrimary,
+        long topVer,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
         @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(ctx.kernalContext(), Collections.<K, V>emptyMap());
@@ -248,7 +257,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                 subjId,
                                 null,
                                 taskName,
-                                filter);
+                                filter,
+                                null);
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null) {
@@ -301,8 +311,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         }
 
         // Either reload or not all values are available locally.
-        GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, keys, topVer, reload, forcePrimary,
-            filter, subjId, taskName, deserializePortable);
+        GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx,
+            keys,
+            topVer,
+            reload,
+            forcePrimary,
+            filter,
+            subjId,
+            taskName,
+            deserializePortable);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
index b785103..db1a058 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -259,8 +259,15 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
 
         GridCacheTxLocalEx<K, V> txx = (tx != null && tx.local()) ? (GridCacheTxLocalEx<K, V>)tx : null;
 
-        GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx, keys, reload, forcePrimary, txx, filter,
-            subjId, taskName, deserializePortable);
+        GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
+            keys,
+            reload,
+            forcePrimary,
+            txx,
+            filter,
+            subjId,
+            taskName,
+            deserializePortable);
 
         // init() will register future for responses if future has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
index 6e1f494..1f1de06 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -250,7 +250,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * @param mapped Mappings to check for duplicates.
      * @param topVer Topology version to map on.
      */
-    private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, final long topVer) {
+    private void map(Collection<? extends K> keys,
+        Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped,
+        final long topVer) {
         Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
 
         if (affNodes.isEmpty()) {
@@ -402,7 +404,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         subjId,
                         null,
                         taskName,
-                        filters);
+                        filters,
+                        null);
 
                 ClusterNode primary = null;
 
@@ -427,7 +430,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                                 subjId,
                                 null,
                                 taskName,
-                                filters);
+                                filters,
+                                null);
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null && isNew && entry.markObsoleteIfEmpty(ver))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 433d199..6eda650 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -607,7 +607,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                             subjId,
                             null,
                             taskName,
-                            filter);
+                            filter,
+                            null);
 
                         if (v != null)
                             vals.put(key, v);
@@ -924,7 +925,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                             subjId,
                             transform,
                             taskName,
-                            CU.<K, V>empty());
+                            CU.<K, V>empty(),
+                            null);
 
                         V updated = transform.apply(old);
 
@@ -1004,7 +1006,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                                 subjId,
                                 null,
                                 taskName,
-                                CU.<K, V>empty());
+                                CU.<K, V>empty(),
+                                null);
 
                             val = ctx.config().getInterceptor().onBeforePut(entry.key(), old, val);
 
@@ -1034,7 +1037,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                                 subjId,
                                 null,
                                 taskName,
-                                CU.<K, V>empty());
+                                CU.<K, V>empty(),
+                                null);
 
                             IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
                                 entry.key(), old);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
index 7c92065..b2ae55b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageFactory.java
@@ -37,7 +37,7 @@ import java.util.*;
  */
 public class GridTcpCommunicationMessageFactory {
     /** Common message producers. */
-    private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[82];
+    private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[83];
 
     /**
      * Custom messages registry. Used for test purposes.
@@ -264,6 +264,9 @@ public class GridTcpCommunicationMessageFactory {
                     case 81:
                         return new GridJobExecuteRequestV2();
 
+                    case 82:
+                        return new GridCacheTtlUpdateRequest();
+
                     default:
                         assert false : "Invalid message type.";
 
@@ -274,7 +277,7 @@ public class GridTcpCommunicationMessageFactory {
            20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
            40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
            60, 61, 62, 63, 64, /* 65-72 - GGFS messages. */    73, 74, 75, 76, 77, 78, 79,
-           80, 81);
+           80, 81, 82);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index c8abd0e..93b0405 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -125,6 +125,37 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
     /**
      * @throws Exception If failed.
      */
+    public void testAccess() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, 62_000L));
+
+        startGrids();
+
+        for (final Integer key : keys()) {
+            log.info("Test access [key=" + key + ']');
+
+            access(key);
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @throws Exception If failed.
+     */
+    private void access(Integer key) throws Exception {
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        cache.put(key, 1);
+
+        checkTtl(key, 60_000L);
+
+        assertEquals((Integer)1, cache.get(key));
+
+        checkTtl(key, 62_000L);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCreateUpdate() throws Exception {
         factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
 
@@ -299,83 +330,6 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
         checkTtl(key, 0L);
     }
 
-    public void _testPrimary() throws Exception {
-        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
-        nearCache = true;
-
-        boolean inTx = true;
-
-        startGrids();
-
-        IgniteCache<Integer, Integer> cache = jcache(0);
-
-        GridCache<Integer, Object> cache0 = cache(0);
-
-        Integer key = primaryKey(cache0);
-
-        log.info("Create: " + key);
-
-        GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
-        cache.put(key, 1);
-
-        if (tx != null)
-            tx.commit();
-
-        checkTtl(key, 60_000);
-
-        tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
-        log.info("Update: " + key);
-
-        cache.put(key, 2);
-
-        if (tx != null)
-            tx.commit();
-
-        checkTtl(key, 61_000);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void _test1() throws Exception {
-        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
-
-        nearCache = false;
-
-        boolean inTx = true;
-
-        startGrids();
-
-        Collection<Integer> keys = keys();
-
-        IgniteCache<Integer, Integer> cache = jcache(0);
-
-        for (final Integer key : keys) {
-            log.info("Test key1: " + key);
-
-            GridCacheTx tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
-            cache.put(key, 1);
-
-            if (tx != null)
-                tx.commit();
-
-            log.info("Test key2: " + key);
-
-            tx = inTx ? grid(0).transactions().txStart(OPTIMISTIC, READ_COMMITTED) : null;
-
-            cache.put(key, 2);
-
-            if (tx != null)
-                tx.commit();
-
-            log.info("Done");
-        }
-    }
-
     /**
      * @param key Key.
      * @param txConcurrency Not null transaction concurrency mode if explicit transaction should be started.
@@ -436,7 +390,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
      * @return Transaction.
      */
     @Nullable private GridCacheTx startTx(@Nullable GridCacheTxConcurrency txConcurrency) {
-        return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, READ_COMMITTED);
+        return txConcurrency == null ? null : ignite(0).transactions().txStart(txConcurrency, REPEATABLE_READ);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index fd2d205..7f25b3f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -16,7 +16,7 @@ import junit.framework.*;
  */
 public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
     /**
-     * @return Cache API test suite.
+     * @return Cache Expiry Policy test suite.
      * @throws Exception If failed.
      */
     public static TestSuite suite() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76429793/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
index 16e5b25..bbce8ca 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheTestEntryEx.java
@@ -402,7 +402,8 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme
         UUID subjId,
         Object transformClo,
         String taskName,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable GridCacheAccessExpiryPolicy expiryPlc) {
         return val;
     }
 


Mime
View raw message