ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [18/18] incubator-ignite git commit: # ignite-41
Date Tue, 16 Dec 2014 15:12:21 GMT
# ignite-41


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/688a2e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/688a2e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/688a2e71

Branch: refs/heads/ignite-41
Commit: 688a2e71509b9b3cebd148045d7388386e5ce0fb
Parents: e85a938
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Dec 16 13:06:26 2014 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Dec 16 18:11:24 2014 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   3 +-
 .../processors/cache/GridCacheIoManager.java    |   2 +
 .../processors/cache/GridCacheMapEntry.java     |  31 +-
 .../cache/GridCacheTxLocalAdapter.java          |  58 +++-
 .../cache/GridCacheUpdateAtomicResult.java      |   2 +-
 .../distributed/GridCacheExpiryPolicy.java      |  74 ++++-
 .../GridDistributedTxRemoteAdapter.java         |   2 +
 .../dht/atomic/GridDhtAtomicCache.java          |  79 +++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  30 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 172 ++++++----
 .../atomic/GridNearAtomicUpdateResponse.java    | 111 ++++++-
 .../distributed/near/GridNearAtomicCache.java   |  35 ++-
 .../expiry/IgniteCacheExpiryPolicyTest.java     | 310 +++++++++++++++++--
 13 files changed, 749 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index f1f4436..39b7338 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -2043,8 +2043,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
                 return tx.putAllAsync(ctx, F.t(key, val), true, cached, ttl, filter).get().value();
             }
 
-            @Override
-            public String toString() {
+            @Override public String toString() {
                 return "put [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
             }
         }));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
index a222c32..a50e461 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
@@ -200,6 +200,8 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
 
             unmarshall(nodeId, cacheMsg);
 
+            log.info("Message: " + cacheMsg);
+
             if (cacheMsg.allowForStartup())
                 processMessage(nodeId, cacheMsg, c);
             else {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
index 6483c8a..2c377e7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMapEntry.java
@@ -151,7 +151,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
      */
     protected GridCacheMapEntry(GridCacheContext<K, V> cctx, K key, int hash, V val,
         GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
-        log = U.logger(cctx.kernalContext(), logRef, this);
+        log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class);
 
         if (cctx.portableEnabled())
             key = (K)cctx.kernalContext().portable().detachPortable(key);
@@ -1112,6 +1112,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         @Nullable UUID subjId,
         String taskName
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        log.info("Inner set " + key + " " + val + " " + ttl);
+
         V old;
 
         boolean valid = valid(tx != null ? tx.topologyVersion() : topVer);
@@ -1630,7 +1632,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         return toTtl(duration);
     }
 
-    private static long toTtl(Duration duration) {
+    public static long toTtl(Duration duration) {
         if (duration == null)
             return -1;
 
@@ -1685,7 +1687,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
         GridDrResolveResult<V> drRes = null;
 
-        long newTtl = 0L;
+        long newTtl = -1L;
         long newExpireTime = 0L;
         long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node.
 
@@ -1869,12 +1871,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     }
                 }
 
+                long ttl0 = newTtl;
+
                 if (drRes == null) {
                     // Calculate TTL and expire time for local update.
                     if (drTtl >= 0L) {
                         assert drExpireTime >= 0L;
 
-                        newTtl = drTtl;
+                        ttl0 = drTtl;
                         newExpireTime = drExpireTime;
                     }
                     else {
@@ -1902,10 +1906,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         else
                             newTtl = -1L;
 
-                        if (newTtl < 0)
-                            newTtl = ttlExtras();
+                        ttl0 = newTtl < 0 ? ttlExtras() : newTtl;
 
-                        newExpireTime = toExpireTime(newTtl);
+                        newExpireTime = toExpireTime(ttl0);
                     }
                 }
 
@@ -1937,7 +1940,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 // in load methods without actually holding entry lock.
                 updateIndex(updated, valBytes, newExpireTime, newVer, old);
 
-                update(updated, valBytes, newExpireTime, newTtl, newVer);
+                update(updated, valBytes, newExpireTime, ttl0, newVer);
 
                 drReplicate(drType, updated, valBytes, newVer);
 
@@ -2048,7 +2051,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 res = hadVal;
 
                 // Do not propagate zeroed TTL and expire time.
-                newTtl = 0L;
+                newTtl = -1L;
                 newDrExpireTime = -1L;
             }
 
@@ -2500,7 +2503,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
      * @param ttl Time to live.
      * @return Expiration time.
      */
-    protected long toExpireTime(long ttl) {
+    public static long toExpireTime(long ttl) {
         long expireTime = ttl == 0 ? 0 : U.currentTimeMillis() + ttl;
 
         // Account for overflow.
@@ -2953,14 +2956,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /** {@inheritDoc} */
-    /*
-    @Override public synchronized GridDrEntry<K, V> drEntry() throws IgniteCheckedException {
-        return new GridDrPlainEntry<>(key, isStartVersion() ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false),
-            ttlExtras(), expireTimeExtras(), ver.drVersion());
-    }
-    */
-
-    /** {@inheritDoc} */
     @Override public synchronized V rawPut(V val, long ttl) {
         V old = this.val;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index abb9fef..c6888f1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -25,6 +25,7 @@ import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.expiry.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
@@ -585,6 +586,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
             addGroupTxMapping(writeSet());
 
         if (!empty) {
+            log.info("User commit");
+
             // We are holding transaction-level locks for entries here, so we can get next write version.
             writeVersion(cctx.versions().next(topologyVersion()));
 
@@ -649,6 +652,19 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                     V val = res.get2();
                                     byte[] valBytes = res.get3();
 
+                                    if (op == CREATE || op == UPDATE && txEntry.drExpireTime() == -1L) {
+                                        ExpiryPolicy expiry = cacheCtx.expiry();
+
+                                        if (expiry != null) {
+                                            Duration duration = cached.hasValue() ?
+                                                expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
+
+                                            txEntry.ttl(GridCacheMapEntry.toTtl(duration));
+
+                                            log.info("Calculated expiry (userCommit), update=" + cached.hasValue() + ", ttl=" + txEntry.ttl() + ", detached=" + cached.detached());
+                                        }
+                                    }
+
                                     // Preserve TTL if needed.
                                     if (txEntry.ttl() < 0)
                                         txEntry.ttl(cached.ttl());
@@ -1154,7 +1170,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                             missed.put(key, ver);
 
                         if (!readCommitted()) {
-                            txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null);
+                            txEntry = addEntry(READ, val, null, entry, null, filter, true, -1L, -1L, null);
 
                             if (groupLock())
                                 txEntry.groupLockEntry(true);
@@ -1179,7 +1195,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                             // Value for which failure occurred.
                             V val = e.<V>value();
 
-                            txEntry = addEntry(READ, val, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, null);
+                            txEntry = addEntry(READ, val, null, entry, null, CU.<K, V>empty(), false, -1L, -1L, null);
 
                             // Mark as checked immediately for non-pessimistic.
                             if (val != null && !pessimistic())
@@ -1698,7 +1714,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
      *
      * @param keys Keys to enlist.
      * @param cached Cached entry.
-     * @param ttl Time to live for entry. If negative, leave unchanged.
+     * @param expiry Expiry policy for entry. If {@code null}, leave unchanged.
      * @param implicit Implicit flag.
      * @param lookup Value lookup map ({@code null} for remove).
      * @param transformMap Map with transform closures if this is a transform operation.
@@ -1715,7 +1731,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
         GridCacheContext<K, V> cacheCtx,
         Collection<? extends K> keys,
         @Nullable GridCacheEntryEx<K, V> cached,
-        long ttl,
+        @Nullable ExpiryPolicy expiry,
         boolean implicit,
         @Nullable Map<? extends K, ? extends V> lookup,
         @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
@@ -1856,7 +1872,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                                 if (!readCommitted() && old != null) {
                                     // Enlist failed filters as reads for non-read-committed mode,
                                     // so future ops will get the same values.
-                                    txEntry = addEntry(READ, old, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L,
+                                    txEntry = addEntry(READ, old, null, entry, null, CU.<K, V>empty(), false, -1L, -1L,
                                         null);
 
                                     txEntry.markValid();
@@ -1869,7 +1885,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                             }
 
                             txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
-                                old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+                                old != null ? UPDATE : CREATE, val, transformClo, entry, expiry, filter, true, drTtl,
                                 drExpireTime, drVer);
 
                             if (!implicit() && readCommitted())
@@ -1956,7 +1972,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                         }
 
                         txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
-                            v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+                            v != null ? UPDATE : CREATE, val, transformClo, entry, expiry, filter, true, drTtl,
                             drExpireTime, drVer);
 
                         enlisted.add(key);
@@ -2219,11 +2235,13 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
 
             Collection<K> enlisted = new LinkedList<>();
 
+            GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
+
             final IgniteFuture<Set<K>> loadFut = enlistWrite(
                 cacheCtx,
                 keySet,
                 cached,
-                ttl,
+                prj != null ? prj.expiry() : null,
                 implicit,
                 map0,
                 transformMap0,
@@ -2404,7 +2422,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                 cacheCtx,
                 keys0,
                 /** cached entry */null,
-                /** ttl */-1,
+                /** expiry */null,
                 implicit,
                 /** lookup map */null,
                 /** transform map */null,
@@ -2555,7 +2573,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                 cacheCtx,
                 keys,
                 /** cached entry */null,
-                /** ttl - leave unchanged */-1,
+                /** expiry - leave unchanged */null,
                 /** implicit */false,
                 /** lookup map */null,
                 /** transform map */null,
@@ -2652,7 +2670,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
     /**
      * @param op Cache operation.
      * @param val Value.
-     * @param ttl Time to leave to set to tx entry. If {@code null}, leave unchanged.
+     * @param expiryPlc Expiry policy, if {@code null}, leave unchanged.
      * @param transformClos Transform closure.
      * @param entry Cache entry.
      * @param filter Filter.
@@ -2662,10 +2680,16 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
      * @param drVer DR version.
      * @return Transaction entry.
      */
-    protected final GridCacheTxEntry<K, V> addEntry(GridCacheOperation op, @Nullable V val,
-        @Nullable IgniteClosure<V, V> transformClos, GridCacheEntryEx<K, V> entry, long ttl,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter, boolean filtersSet, long drTtl,
-        long drExpireTime, @Nullable GridCacheVersion drVer) {
+    protected final GridCacheTxEntry<K, V> addEntry(GridCacheOperation op,
+        @Nullable V val,
+        @Nullable IgniteClosure<V, V> transformClos,
+        GridCacheEntryEx<K, V> entry,
+        @Nullable ExpiryPolicy expiryPlc, // TODO IGNITE-41
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        boolean filtersSet,
+        long drTtl,
+        long drExpireTime,
+        @Nullable GridCacheVersion drVer) {
         GridCacheTxKey<K> key = entry.txKey();
 
         checkInternal(key);
@@ -2706,6 +2730,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
             old.cached(entry, old.keyBytes());
             old.filters(filter);
 
+            long ttl = -1L;
+
             // Update ttl if specified.
             if (drTtl >= 0L) {
                 assert drExpireTime >= 0L;
@@ -2721,6 +2747,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
                 log.debug("Updated transaction entry: " + txEntry);
         }
         else {
+            long ttl = -1L;
+
             if (drTtl >= 0L)
                 ttl = drTtl;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
index 048df15..43ca819 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -98,7 +98,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
     }
 
     /**
-     * @return New TTL.
+     * @return {@code -1} if TTL did not change, otherwise new TTL.
      */
     public long newTtl() {
         return newTtl;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
index f7fe27a..3a77884 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java
@@ -9,6 +9,9 @@
 
 package org.gridgain.grid.kernal.processors.cache.distributed;
 
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
 import javax.cache.expiry.*;
 import java.io.*;
 import java.util.concurrent.*;
@@ -27,11 +30,24 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
     private static final byte UPDATE_TTL_MASK = 0x02;
 
     /** */
+    private static final byte ACCESS_TTL_MASK = 0x04;
+
+    /** */
     private Duration forCreate;
 
     /** */
     private Duration forUpdate;
 
+    /** */
+    private Duration forAccess;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public GridCacheExpiryPolicy() {
+        // No-op.
+    }
+
     /**
      * @param plc Expiry policy.
      */
@@ -48,9 +64,7 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
 
     /** {@inheritDoc} */
     @Override public Duration getExpiryForAccess() {
-        assert false;
-
-        return null;
+        return forAccess;
     }
 
     /** {@inheritDoc} */
@@ -58,6 +72,38 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
         return forUpdate;
     }
 
+    /**
+     * @param out Output stream.
+     * @param duration Duration.
+     * @throws IOException
+     */
+    private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException {
+        if (duration != null) {
+            if (duration.isEternal())
+                out.writeLong(0);
+            else if (duration.getDurationAmount() == 0)
+                out.writeLong(1);
+            else
+                out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
+        }
+    }
+
+    /**
+     * @param in Input stream.
+     * @return Duration.
+     * @throws IOException
+     */
+    private Duration readDuration(ObjectInput in) throws IOException {
+        long ttl = in.readLong();
+
+        assert ttl >= 0;
+
+        if (ttl == 0)
+            return Duration.ETERNAL;
+
+        return new Duration(TimeUnit.MILLISECONDS, ttl);
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         byte flags = 0;
@@ -72,7 +118,18 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
         if (update != null)
             flags |= UPDATE_TTL_MASK;
 
+        Duration access = plc.getExpiryForAccess();
+
+        if (access != null)
+            flags |= ACCESS_TTL_MASK;
+
         out.writeByte(flags);
+
+        writeDuration(out, create);
+
+        writeDuration(out, update);
+
+        writeDuration(out, access);
     }
 
     /** {@inheritDoc} */
@@ -80,9 +137,16 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable {
         byte flags = in.readByte();
 
         if ((flags & CREATE_TTL_MASK) != 0)
-            forCreate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+            forCreate = readDuration(in);
 
         if ((flags & UPDATE_TTL_MASK) != 0)
-            forUpdate = new Duration(TimeUnit.MILLISECONDS, in.readLong());
+            forUpdate = readDuration(in);
+
+        if ((flags & ACCESS_TTL_MASK) != 0)
+            forAccess = readDuration(in);
+    }
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheExpiryPolicy.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3cd3e2d..3b34552 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -446,6 +446,8 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
     @SuppressWarnings({"CatchGenericClass"})
     private void commitIfLocked() throws IgniteCheckedException {
         if (state() == COMMITTING) {
+            log.info("commitIfLocked");
+
             for (GridCacheTxEntry<K, V> txEntry : writeMap.values()) {
                 assert txEntry != null : "Missing transaction entry for tx: " + this;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fd2d98d..64c95c3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1355,6 +1355,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.subjectId(),
                     taskName);
 
+                assert updRes.newTtl() == -1L || (expiry != null || updRes.drExpireTime() >= 0);
+
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, true);
 
@@ -1366,7 +1368,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         GridDrResolveResult<V> ctx = updRes.drResolveResult();
 
                         long ttl = updRes.newTtl();
-                        long drExpireTime = updRes.drExpireTime();
+                        long expireTime = updRes.drExpireTime();
 
                         if (ctx == null)
                             newDrVer = null;
@@ -1380,19 +1382,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (req.forceTransformBackups() && op == TRANSFORM)
                             transformC = (IgniteClosure<V, V>)writeVal;
 
-                        if (!readersOnly)
+                        if (!readersOnly) {
                             dhtFut.addWriteEntry(entry,
                                 updRes.newValue(),
                                 newValBytes,
                                 transformC,
-                                drExpireTime >= 0L ? ttl : -1L,
-                                drExpireTime,
-                                newDrVer,
-                                drExpireTime < 0L ? req.expiry() : null);
+                                updRes.newTtl(),
+                                expireTime,
+                                newDrVer);
+                        }
 
                         if (!F.isEmpty(filteredReaders))
-                            dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), newValBytes,
-                                transformC, drExpireTime < 0L ? req.expiry() : null);
+                            dhtFut.addNearWriteEntries(filteredReaders,
+                                entry,
+                                updRes.newValue(),
+                                newValBytes,
+                                transformC,
+                                ttl,
+                                expireTime);
                     }
                     else {
                         // TODO IGNITE-41 ttl could be changed.
@@ -1408,14 +1415,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
                             GridDrResolveResult<V> ctx = updRes.drResolveResult();
 
-                            // TODO IGNITE-41 dr ttl for near cache.
+                            long ttl = updRes.newTtl();
+                            long expireTime = updRes.drExpireTime();
 
                             if (ctx != null && ctx.isMerge())
                                 newValBytes = null;
 
                             // If put the same value as in request then do not need to send it back.
                             if (op == TRANSFORM || writeVal != updRes.newValue())
-                                res.addNearValue(i, updRes.newValue(), newValBytes);
+                                res.addNearValue(i,
+                                    updRes.newValue(),
+                                    newValBytes,
+                                    ttl,
+                                    expireTime);
+                            else
+                                res.addNearTtl(i, ttl, expireTime);
 
                             if (updRes.newValue() != null || newValBytes != null) {
                                 IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
@@ -1596,6 +1610,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         req.subjectId(),
                         taskName);
 
+                    assert updRes.newTtl() == -1L || expiry != null;
+
                     if (intercept) {
                         if (op == UPDATE)
                             ctx.config().getInterceptor().onAfterPut(entry.key(), updRes.newValue());
@@ -1624,25 +1640,42 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         IgniteClosure<V, V> transformC = transformMap == null ? null : transformMap.get(entry.key());
 
                         if (!batchRes.readersOnly())
-                            dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.expiry());
+                            dhtFut.addWriteEntry(entry,
+                                writeVal,
+                                valBytes,
+                                transformC,
+                                updRes.newTtl(),
+                                -1,
+                                null);
 
                         if (!F.isEmpty(filteredReaders))
-                            dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, valBytes, transformC,
-                                req.expiry());
+                            dhtFut.addNearWriteEntries(filteredReaders,
+                                entry,
+                                writeVal,
+                                valBytes,
+                                transformC,
+                                updRes.newTtl(),
+                                -1);
                     }
 
                     if (hasNear) {
                         if (primary) {
                             if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
-                                if (req.operation() == TRANSFORM) {
-                                    int idx = firstEntryIdx + i;
+                                int idx = firstEntryIdx + i;
 
+                                if (req.operation() == TRANSFORM) {
                                     GridCacheValueBytes valBytesTuple = entry.valueBytes();
 
                                     byte[] valBytes = valBytesTuple.getIfMarshaled();
 
-                                    res.addNearValue(idx, writeVal, valBytes);
+                                    res.addNearValue(idx,
+                                        writeVal,
+                                        valBytes,
+                                        updRes.newTtl(),
+                                        -1);
                                 }
+                                else
+                                    res.addNearTtl(idx, updRes.newTtl(), -1);
 
                                 if (writeVal != null || !entry.valueBytes().isNull()) {
                                     IgniteFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
@@ -2037,8 +2070,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
         for (int i = 0; i < req.size(); i++) {
             K key = req.key(i);
 
@@ -2058,6 +2089,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 UPDATE :
                                 DELETE;
 
+                        long ttl = req.drTtl(i);
+                        long expireTime = req.drExpireTime(i);
+
+                        if (ttl != -1L && expireTime == -1L)
+                            expireTime = GridCacheMapEntry.toExpireTime(ttl);
+
                         GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
                             ver,
                             nodeId,
@@ -2067,15 +2104,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             valBytes,
                             /*write-through*/false,
                             /*retval*/false,
-                            expiry,
+                            null,
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
                             /*check version*/!req.forceTransformBackups(),
                             CU.<K, V>empty(),
                             replicate ? DR_BACKUP : DR_NONE,
-                            req.drTtl(i),
-                            req.drExpireTime(i),
+                            ttl,
+                            expireTime,
                             req.drVersion(i),
                             false,
                             intercept,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3c7da7b..25bc875 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -202,7 +202,6 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
      * @param drTtl DR TTL (optional).
      * @param drExpireTime DR expire time (optional).
      * @param drVer DR version (optional).
-     * @param expiryPlc Expiry policy.
      */
     public void addWriteEntry(GridDhtCacheEntry<K, V> entry,
         @Nullable V val,
@@ -210,8 +209,7 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
         IgniteClosure<V, V> transformC,
         long drTtl,
         long drExpireTime,
-        @Nullable GridCacheVersion drVer,
-        @Nullable ExpiryPolicy expiryPlc) {
+        @Nullable GridCacheVersion drVer) {
         long topVer = updateReq.topologyVersion();
 
         Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
@@ -237,7 +235,6 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                         writeVer,
                         syncMode,
                         topVer,
-                        expiryPlc,
                         forceTransformBackups,
                         this.updateReq.subjectId(),
                         this.updateReq.taskNameHash());
@@ -245,8 +242,14 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                     mappings.put(nodeId, updateReq);
                 }
 
-                updateReq.addWriteValue(entry.key(), entry.keyBytes(), val, valBytes, transformC, drTtl,
-                    drExpireTime, drVer);
+                updateReq.addWriteValue(entry.key(),
+                    entry.keyBytes(),
+                    val,
+                    valBytes,
+                    transformC,
+                    drTtl,
+                    drExpireTime,
+                    drVer);
             }
         }
     }
@@ -256,14 +259,16 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
      * @param entry Entry.
      * @param val Value.
      * @param valBytes Value bytes.
-     * @param expiryPlc Expiry policy..
+     * @param TTL for near cache update (optional).
+     * @param expireTime Expire time for near cache update (optional).
      */
     public void addNearWriteEntries(Iterable<UUID> readers,
         GridDhtCacheEntry<K, V> entry,
         @Nullable V val,
         @Nullable byte[] valBytes,
         IgniteClosure<V, V> transformC,
-        @Nullable ExpiryPolicy expiryPlc) {
+        long ttl,
+        long expireTime) {
         GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
 
         keys.add(entry.key());
@@ -287,7 +292,6 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
                     writeVer,
                     syncMode,
                     topVer,
-                    expiryPlc,
                     forceTransformBackups,
                     this.updateReq.subjectId(),
                     this.updateReq.taskNameHash());
@@ -300,7 +304,13 @@ public class GridDhtAtomicUpdateFuture<K, V> extends GridFutureAdapter<Void>
 
             nearReadersEntries.put(entry.key(), entry);
 
-            updateReq.addNearWriteValue(entry.key(), entry.keyBytes(), val, valBytes, transformC);
+            updateReq.addNearWriteValue(entry.key(),
+                entry.keyBytes(),
+                val,
+                valBytes,
+                transformC,
+                ttl,
+                expireTime);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index c3b0918..fda44c9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -77,14 +77,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     /** DR TTLs. */
     private GridLongList drExpireTimes;
 
-    /** Write synchronization mode. */
-    private GridCacheWriteSynchronizationMode syncMode;
+    /** Near TTLs. */
+    private GridLongList nearTtls;
 
-    /** Expiry policy. */
-    private ExpiryPolicy expiryPlc;
+    /** Near expire times. */
+    private GridLongList nearExpireTimes;
 
-    /** Expiry policy bytes. */
-    private byte[] expiryPlcBytes;
+    /** Write synchronization mode. */
+    private GridCacheWriteSynchronizationMode syncMode;
 
     /** Keys to update. */
     @GridToStringInclude
@@ -154,7 +154,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param writeVer Write version for cache values.
      * @param syncMode Cache write synchronization mode.
      * @param topVer Topology version.
-     * @param expiryPlc Expiry policy.
      * @param forceTransformBackups Force transform backups flag.
      * @param subjId Subject ID.
      */
@@ -165,7 +164,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         GridCacheVersion writeVer,
         GridCacheWriteSynchronizationMode syncMode,
         long topVer,
-        @Nullable ExpiryPolicy expiryPlc,
         boolean forceTransformBackups,
         UUID subjId,
         int taskNameHash
@@ -175,7 +173,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         this.futVer = futVer;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
-        this.expiryPlc = expiryPlc;
         this.topVer = topVer;
         this.forceTransformBackups = forceTransformBackups;
         this.subjId = subjId;
@@ -210,8 +207,14 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param drExpireTime DR expire time (optional).
      * @param drVer DR version (optional).
      */
-    public void addWriteValue(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes,
-        IgniteClosure<V, V> transformC, long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer) {
+    public void addWriteValue(K key,
+        @Nullable byte[] keyBytes,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        IgniteClosure<V, V> transformC,
+        long drTtl,
+        long drExpireTime,
+        @Nullable GridCacheVersion drVer) {
         keys.add(key);
         this.keyBytes.add(keyBytes);
 
@@ -265,8 +268,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
      * @param val Value, {@code null} if should be removed.
      * @param valBytes Value bytes, {@code null} if should be removed.
      */
-    public void addNearWriteValue(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes,
-        IgniteClosure<V, V> transformC) {
+    public void addNearWriteValue(K key,
+        @Nullable byte[] keyBytes,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        IgniteClosure<V, V> transformC,
+        long ttl,
+        long expireTime) {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
             nearKeyBytes = new ArrayList<>();
@@ -293,6 +301,28 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
             nearVals.add(val);
             nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
         }
+
+        if (ttl >= 0) {
+            if (nearTtls == null) {
+                nearTtls = new GridLongList(nearKeys.size());
+
+                for (int i = 0; i < nearKeys.size() - 1; i++)
+                    nearTtls.add(-1);
+            }
+
+            nearTtls.add(ttl);
+        }
+
+        if (expireTime >= 0) {
+            if (nearExpireTimes == null) {
+                nearExpireTimes = new GridLongList(nearKeys.size());
+
+                for (int i = 0; i < nearKeys.size() - 1; i++)
+                    nearExpireTimes.add(-1);
+            }
+
+            nearExpireTimes.add(expireTime);
+        }
     }
 
     /** {@inheritDoc} */
@@ -364,13 +394,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     }
 
     /**
-     * @return Expiry policy.
-     */
-    @Nullable public ExpiryPolicy expiry() {
-        return expiryPlc;
-    }
-
-    /**
      * @return Keys.
      */
     public Collection<K> keys() {
@@ -542,6 +565,20 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
     }
 
     /**
+     * @param idx Index.
+     * @return TTL for near cache update.
+     */
+    public long nearTtl(int idx) {
+        if (nearTtls != null) {
+            assert idx >= 0 && idx < nearTtls.size();
+
+            return nearTtls.get(idx);
+        }
+
+        return -1L;
+    }
+
+    /**
      * @return DR TTLs.
      */
     @Nullable public GridLongList drExpireTimes() {
@@ -562,6 +599,20 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         return -1L;
     }
 
+    /**
+     * @param idx Index.
+     * @return Expire time for near cache update.
+     */
+    public long nearExpireTime(int idx) {
+        if (nearExpireTimes != null) {
+            assert idx >= 0 && idx < nearExpireTimes.size();
+
+            return nearExpireTimes.get(idx);
+        }
+
+        return -1L;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
@@ -625,7 +676,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         _clone.drTtls = drTtls;
         _clone.drExpireTimes = drExpireTimes;
         _clone.syncMode = syncMode;
-        _clone.expiryPlc = expiryPlc;
         _clone.nearKeys = nearKeys;
         _clone.nearKeyBytes = nearKeyBytes;
         _clone.nearVals = nearVals;
@@ -635,6 +685,8 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
         _clone.transformClosBytes = transformClosBytes;
         _clone.nearTransformClos = nearTransformClos;
         _clone.nearTransformClosBytes = nearTransformClosBytes;
+        _clone.nearExpireTimes = nearExpireTimes;
+        _clone.nearTtls = nearTtls;
         _clone.subjId = subjId;
         _clone.taskNameHash = taskNameHash;
     }
@@ -746,12 +798,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 commState.idx++;
 
             case 11:
-                if (!commState.putByteArray(expiryPlcBytes))
-                    return false;
-
-                commState.idx++;
-
-            case 12:
                 if (valBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(valBytes.size()))
@@ -778,13 +824,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 13:
+            case 12:
                 if (!commState.putCacheVersion(writeVer))
                     return false;
 
                 commState.idx++;
 
-            case 14:
+            case 13:
                 if (nearKeyBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(nearKeyBytes.size()))
@@ -811,7 +857,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 15:
+            case 14:
                 if (nearValBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(nearValBytes.size()))
@@ -838,13 +884,13 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 16:
+            case 15:
                 if (!commState.putBoolean(forceTransformBackups))
                     return false;
 
                 commState.idx++;
 
-            case 17:
+            case 16:
                 if (nearTransformClosBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(nearTransformClosBytes.size()))
@@ -871,7 +917,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 18:
+            case 17:
                 if (transformClosBytes != null) {
                     if (commState.it == null) {
                         if (!commState.putInt(transformClosBytes.size()))
@@ -898,18 +944,29 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 19:
+            case 18:
                 if (!commState.putUuid(subjId))
                     return false;
 
                 commState.idx++;
 
-            case 20:
+            case 19:
                 if (!commState.putInt(taskNameHash))
                     return false;
 
                 commState.idx++;
 
+            case 20:
+                if (!commState.putLongList(nearExpireTimes))
+                    return false;
+
+                commState.idx++;
+
+            case 21:
+                if (!commState.putLongList(nearTtls))
+                    return false;
+
+                commState.idx++;
         }
 
         return true;
@@ -1041,16 +1098,6 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
                 commState.idx++;
 
             case 11:
-                byte[] expiryPlcBytes0 = commState.getByteArray();
-
-                if (expiryPlcBytes0 == BYTE_ARR_NOT_READ)
-                    return false;
-
-                expiryPlcBytes = expiryPlcBytes0;
-
-                commState.idx++;
-
-            case 12:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -1079,7 +1126,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 13:
+            case 12:
                 GridCacheVersion writeVer0 = commState.getCacheVersion();
 
                 if (writeVer0 == CACHE_VER_NOT_READ)
@@ -1089,7 +1136,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 14:
+            case 13:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -1118,7 +1165,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 15:
+            case 14:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -1147,7 +1194,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 16:
+            case 15:
                 if (buf.remaining() < 1)
                     return false;
 
@@ -1155,7 +1202,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 17:
+            case 16:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -1184,7 +1231,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 18:
+            case 17:
                 if (commState.readSize == -1) {
                     if (buf.remaining() < 4)
                         return false;
@@ -1213,7 +1260,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 19:
+            case 18:
                 UUID subjId0 = commState.getUuid();
 
                 if (subjId0 == UUID_NOT_READ)
@@ -1223,7 +1270,7 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
-            case 20:
+            case 19:
                 if (buf.remaining() < 4)
                     return false;
 
@@ -1231,6 +1278,25 @@ public class GridDhtAtomicUpdateRequest<K, V> extends GridCacheMessage<K, V> imp
 
                 commState.idx++;
 
+            case 20:
+                GridLongList nearExpireTimes0 = commState.getLongList();
+
+                if (nearExpireTimes0 == LONG_LIST_NOT_READ)
+                    return false;
+
+                nearExpireTimes = nearExpireTimes0;
+
+                commState.idx++;
+
+            case 21:
+                GridLongList nearTtls0 = commState.getLongList();
+
+                if (nearTtls0 == LONG_LIST_NOT_READ)
+                    return false;
+
+                nearTtls = nearTtls0;
+
+                commState.idx++;
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index fd4d7dc..0be44aa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic;
 import org.apache.ignite.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.direct.*;
 import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
@@ -94,6 +95,12 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
     @GridDirectVersion(1)
     private GridCacheVersion nearVer;
 
+    /** Near TTLs. */
+    private GridLongList nearTtls;
+
+    /** Near expire times. */
+    private GridLongList nearExpireTimes;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -186,20 +193,87 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
      * @param keyIdx Key index.
      * @param val Value.
      * @param valBytes Value bytes.
+     * @param ttl TTL for near cache update.
+     * @param expireTime Expire time for near cache update.
      */
-    public void addNearValue(int keyIdx, @Nullable V val, @Nullable byte[] valBytes) {
+    public void addNearValue(int keyIdx,
+        @Nullable V val,
+        @Nullable byte[] valBytes,
+        long ttl,
+        long expireTime) {
         if (nearValsIdxs == null) {
             nearValsIdxs = new ArrayList<>();
             nearValBytes = new ArrayList<>();
             nearVals = new ArrayList<>();
         }
 
+        addNearTtl(keyIdx, ttl, expireTime);
+
         nearValsIdxs.add(keyIdx);
         nearVals.add(val);
         nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null);
     }
 
     /**
+     * @param ttl TTL for near cache update.
+     * @param expireTime Expire time for near cache update.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+        if (ttl >= 0) {
+            if (nearTtls == null) {
+                nearTtls = new GridLongList(16);
+
+                for (int i = 0; i < keyIdx; i++)
+                    nearTtls.add(-1L);
+            }
+        }
+
+        if (nearTtls != null)
+            nearTtls.add(ttl);
+
+        if (expireTime >= 0) {
+            if (nearExpireTimes == null) {
+                nearExpireTimes = new GridLongList(16);
+
+                for (int i = 0; i < keyIdx; i++)
+                    nearExpireTimes.add(-1);
+            }
+        }
+
+        if (nearExpireTimes != null)
+            nearExpireTimes.add(expireTime);
+    }
+
+    /**
+     * @param idx Index.
+     * @return Expire time for near cache update.
+     */
+    public long nearExpireTime(int idx) {
+        if (nearExpireTimes != null) {
+            assert idx >= 0 && idx < nearExpireTimes.size();
+
+            return nearExpireTimes.get(idx);
+        }
+
+        return -1L;
+    }
+
+    /**
+     * @param idx Index.
+     * @return TTL for near cache update.
+     */
+    public long nearTtl(int idx) {
+        if (nearTtls != null) {
+            assert idx >= 0 && idx < nearTtls.size();
+
+            return nearTtls.get(idx);
+        }
+
+        return -1L;
+    }
+
+    /**
      * @param nearVer Version generated on primary node to be used for originating node's near cache update.
      */
     public void nearVersion(GridCacheVersion nearVer) {
@@ -221,6 +295,8 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
             nearSkipIdxs = new ArrayList<>();
 
         nearSkipIdxs.add(keyIdx);
+
+        addNearTtl(keyIdx, -1L, -1L);
     }
 
     /**
@@ -366,6 +442,8 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
         _clone.nearVals = nearVals;
         _clone.nearValBytes = nearValBytes;
         _clone.nearVer = nearVer;
+        _clone.nearTtls = nearTtls;
+        _clone.nearExpireTimes = nearExpireTimes;
     }
 
     /** {@inheritDoc} */
@@ -501,6 +579,17 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
 
                 commState.idx++;
 
+            case 12:
+                if (!commState.putLongList(nearExpireTimes))
+                    return false;
+
+                commState.idx++;
+
+            case 13:
+                if (!commState.putLongList(nearTtls))
+                    return false;
+
+                commState.idx++;
         }
 
         return true;
@@ -662,6 +751,26 @@ public class GridNearAtomicUpdateResponse<K, V> extends GridCacheMessage<K, V> i
 
                 commState.idx++;
 
+            case 12:
+                GridLongList nearExpireTimes0 = commState.getLongList();
+
+                if (nearExpireTimes0 == LONG_LIST_NOT_READ)
+                    return false;
+
+                nearExpireTimes = nearExpireTimes0;
+
+                commState.idx++;
+
+            case 13:
+                GridLongList nearTtls0 = commState.getLongList();
+
+                if (nearTtls0 == LONG_LIST_NOT_READ)
+                    return false;
+
+                nearTtls = nearTtls0;
+
+                commState.idx++;
+
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2aa32c3..1da6626 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -148,12 +148,19 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                 }
             }
 
+            long ttl = res.nearTtl(i);
+            long expireTime = res.nearExpireTime(i);
+
+            if (ttl != -1L && expireTime == -1L)
+                expireTime = GridCacheMapEntry.toExpireTime(ttl);
+
             try {
                 processNearAtomicUpdateResponse(ver,
                     key,
                     val,
                     valBytes,
-                    req.expiry(),
+                    ttl,
+                    expireTime,
                     req.nodeId(),
                     req.subjectId(),
                     taskName);
@@ -169,7 +176,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
      * @param key Key.
      * @param val Value.
      * @param valBytes Value bytes.
-     * @param expiryPlc Expiry policy.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
      * @param nodeId Node ID.
      * @throws IgniteCheckedException If failed.
      */
@@ -178,7 +186,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         K key,
         @Nullable V val,
         @Nullable byte[] valBytes,
-        ExpiryPolicy expiryPlc,
+        long ttl,
+        long expireTime,
         UUID nodeId,
         UUID subjId,
         String taskName
@@ -203,15 +212,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         valBytes,
                         /*write-through*/false,
                         /*retval*/false,
-                        expiryPlc != null ? expiryPlc : ctx.expiry(),
+                        null,
                         /*event*/true,
                         /*metrics*/true,
                         /*primary*/false,
                         /*check version*/true,
                         CU.<K, V>empty(),
                         DR_NONE,
-                        -1,
-                        -1,
+                        ttl,
+                        expireTime,
                         null,
                         false,
                         false,
@@ -260,8 +269,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
-        ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry();
-
         for (int i = 0; i < req.nearSize(); i++) {
             K key = req.nearKey(i);
 
@@ -292,6 +299,12 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                                 UPDATE :
                                 DELETE;
 
+                        long ttl = req.nearTtl(i);
+                        long expireTime = req.nearExpireTime(i);
+
+                        if (ttl != -1L && expireTime == -1L)
+                            expireTime = GridCacheMapEntry.toExpireTime(ttl);
+
                         GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
                             ver,
                             nodeId,
@@ -301,15 +314,15 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             valBytes,
                             /*write-through*/false,
                             /*retval*/false,
-                            expiry,
+                            null,
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
                             /*check version*/!req.forceTransformBackups(),
                             CU.<K, V>empty(),
                             DR_NONE,
-                            -1,
-                            -1,
+                            ttl,
+                            expireTime,
                             null,
                             false,
                             intercept,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
index 0d22f62..f96e5a7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java
@@ -36,6 +36,9 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
     /** */
     private Factory<? extends ExpiryPolicy> factory;
 
+    /** */
+    private boolean nearCache;
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         // No-op.
@@ -48,7 +51,160 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
 
     /** {@inheritDoc} */
     @Override protected int gridCount() {
-        return 2;
+        return 3;
+    }
+
+    public void testPrimary() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+        nearCache = false;
+
+        boolean inTx = false;
+
+        startGrids();
+
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        GridCache<Integer, Object> cache0 = cache(0);
+
+        Integer key = primaryKey(cache0);
+
+        log.info("Create: " + key);
+
+        GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+        cache.put(key, 1);
+
+        if (tx != null)
+            tx.commit();
+
+        checkTtl(key, 60_000);
+
+        tx = inTx ? grid(0).transactions().txStart() : null;
+
+        log.info("Update: " + key);
+
+        cache.put(key, 2);
+
+        if (tx != null)
+            tx.commit();
+
+        checkTtl(key, 61_000);
+    }
+
+    public void testBackup() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+        nearCache = false;
+
+        boolean inTx = false;
+
+        startGrids();
+
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        GridCache<Integer, Object> cache0 = cache(0);
+
+        Integer key = backupKey(cache0);
+
+        log.info("Create: " + key);
+
+        GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+        cache.put(key, 1);
+
+        if (tx != null)
+            tx.commit();
+
+        checkTtl(key, 60_000);
+
+        tx = inTx ? grid(0).transactions().txStart() : null;
+
+        log.info("Update: " + key);
+
+        cache.put(key, 2);
+
+        if (tx != null)
+            tx.commit();
+
+        checkTtl(key, 61_000);
+    }
+
+    public void testNear() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null));
+
+        nearCache = false;
+
+        boolean inTx = true;
+
+        startGrids();
+
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        GridCache<Integer, Object> cache0 = cache(0);
+
+        Integer key = nearKey(cache0);
+
+        log.info("Create: " + key);
+
+        GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+        cache.put(key, 1);
+
+        if (tx != null)
+            tx.commit();
+
+        checkTtl(key, 60_000);
+
+        tx = inTx ? grid(0).transactions().txStart() : null;
+
+        log.info("Update: " + key);
+
+        cache.put(key, 2);
+
+        if (tx != null)
+            tx.commit();
+
+        checkTtl(key, 61_000);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void test1() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
+
+        nearCache = false;
+
+        boolean inTx = true;
+
+        startGrids();
+
+        Collection<Integer> keys = keys();
+
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        for (final Integer key : keys) {
+            log.info("Test key1: " + key);
+
+            GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+            cache.put(key, 1);
+
+            if (tx != null)
+                tx.commit();
+        }
+
+        for (final Integer key : keys) {
+            log.info("Test key2: " + key);
+
+            GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null;
+
+            cache.put(key, 2);
+
+            if (tx != null)
+                tx.commit();
+        }
     }
 
     /**
@@ -93,6 +249,94 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNearPut() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
+
+        nearCache = true;
+
+        startGrids();
+
+        GridCache<Integer, Object> cache0 = cache(0);
+
+        Integer key = nearKey(cache0);
+
+        IgniteCache<Integer, Integer> jcache0 = jcache(0);
+
+        jcache0.put(key, 1);
+
+        checkTtl(key, 60_000);
+
+        IgniteCache<Integer, Integer> jcache1 = jcache(1);
+
+        // Update from another node with provided TTL.
+        jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2);
+
+        checkTtl(key, 1000);
+
+        waitExpired(key);
+
+        jcache1.remove(key);
+
+        jcache0.put(key, 1);
+
+        checkTtl(key, 60_000);
+
+        // Update from near node with provided TTL.
+        jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2);
+
+        checkTtl(key, 1100);
+
+        waitExpired(key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearPutAll() throws Exception {
+        factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null));
+
+        nearCache = true;
+
+        startGrids();
+
+        Map<Integer, Integer> vals = new HashMap<>();
+
+        for (int i = 0; i < 1000; i++)
+            vals.put(i, i);
+
+        IgniteCache<Integer, Integer> jcache0 = jcache(0);
+
+        jcache0.putAll(vals);
+
+        for (Integer key : vals.keySet())
+            checkTtl(key, 60_000);
+
+        IgniteCache<Integer, Integer> jcache1 = jcache(1);
+
+        // Update from another node with provided TTL.
+        jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals);
+
+        for (Integer key : vals.keySet())
+            checkTtl(key, 1000);
+
+        waitExpired(vals.keySet());
+
+        jcache0.removeAll(vals.keySet());
+
+        jcache0.putAll(vals);
+
+        // Update from near node with provided TTL.
+        jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals);
+
+        for (Integer key : vals.keySet())
+            checkTtl(key, 1101L);
+
+        waitExpired(vals.keySet());
+    }
+
+    /**
      * @return Test keys.
      * @throws Exception If failed.
      */
@@ -106,7 +350,7 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
         if (gridCount() > 1) {
             keys.add(backupKey(cache));
 
-            if (cache.configuration().getDistributionMode() == NEAR_PARTITIONED)
+            if (cache.configuration().getCacheMode() != REPLICATED)
                 keys.add(nearKey(cache));
         }
 
@@ -117,16 +361,27 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
      * @param key Key.
      * @throws Exception If failed.
      */
-    private void waitExpired(final Integer key) throws Exception {
+    private void waitExpired(Integer key) throws Exception {
+        waitExpired(Collections.singleton(key));
+    }
+
+    /**
+     * @param keys Keys.
+     * @throws Exception If failed.
+     */
+    private void waitExpired(final Collection<Integer> keys) throws Exception {
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 for (int i = 0; i < gridCount(); i++) {
-                    Object val = jcache(i).localPeek(key);
+                    for (Integer key : keys) {
+                        Object val = jcache(i).localPeek(key);
 
-                    log.info("Value [grid=" + i + ", val=" + val + ']');
+                        if (val != null) {
+                            // log.info("Value [grid=" + i + ", val=" + val + ']');
 
-                    if (val != null)
-                        return false;
+                            return false;
+                        }
+                    }
                 }
 
                 return false;
@@ -138,17 +393,23 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
         for (int i = 0; i < gridCount(); i++) {
             ClusterNode node = grid(i).cluster().localNode();
 
-            Object val = jcache(i).localPeek(key);
+            for (Integer key : keys) {
+                Object val = jcache(i).localPeek(key);
 
-            log.info("Value [grid=" + i +
-                ", primary=" + cache.affinity().isPrimary(node, key) +
-                ", backup=" + cache.affinity().isBackup(node, key) + ']');
+                if (val != null) {
+                    log.info("Unexpected value [grid=" + i +
+                        ", primary=" + cache.affinity().isPrimary(node, key) +
+                        ", backup=" + cache.affinity().isBackup(node, key) + ']');
+                }
 
-            assertNull("Unexpected non-null value for grid " + i, val);
+                assertNull("Unexpected non-null value for grid " + i, val);
+            }
         }
 
-        for (int i = 0; i < gridCount(); i++)
-            assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
+        for (int i = 0; i < gridCount(); i++) {
+            for (Integer key : keys)
+                assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key));
+        }
     }
 
     /**
@@ -167,11 +428,8 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
             if (e == null && cache.context().isNear())
                 e = cache.context().near().dht().peekEx(key);
 
-            if (e == null) {
-                assertTrue(i > 0);
-
+            if (e == null)
                 assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key));
-            }
             else
                 assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl());
         }
@@ -184,10 +442,16 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
         GridCacheConfiguration cfg = super.cacheConfiguration(gridName);
 
         cfg.setCacheMode(PARTITIONED);
-        cfg.setAtomicityMode(ATOMIC);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+
+        //cfg.setAtomicityMode(ATOMIC);
+
         cfg.setBackups(1);
 
-        cfg.setDistributionMode(PARTITIONED_ONLY);
+        if (nearCache && gridName.equals(getTestGridName(0)))
+            cfg.setDistributionMode(NEAR_PARTITIONED);
+        else
+            cfg.setDistributionMode(PARTITIONED_ONLY);
 
         cfg.setExpiryPolicyFactory(factory);
 
@@ -213,11 +477,11 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest {
          * @param update TTL for update.
          */
         TestPolicy(@Nullable Long create,
-           @Nullable Long access,
-           @Nullable Long update) {
+            @Nullable Long update,
+            @Nullable Long access) {
             this.create = create;
-            this.access = access;
             this.update = update;
+            this.access = access;
         }
 
         /** {@inheritDoc} */


Mime
View raw message