Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ABFC89A50 for ; Tue, 16 Dec 2014 15:12:35 +0000 (UTC) Received: (qmail 74933 invoked by uid 500); 16 Dec 2014 15:12:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 74879 invoked by uid 500); 16 Dec 2014 15:12:31 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 74860 invoked by uid 99); 16 Dec 2014 15:12:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Dec 2014 15:12:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 16 Dec 2014 15:12:27 +0000 Received: (qmail 71718 invoked by uid 99); 16 Dec 2014 15:12:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Dec 2014 15:12:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 358F3A2C812; Tue, 16 Dec 2014 15:12:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 16 Dec 2014 15:12:21 -0000 Message-Id: <669738fe1f9145c68f592dc4875e2d38@git.apache.org> In-Reply-To: <89846a6580e14bc48d04fd4c457c9175@git.apache.org> References: <89846a6580e14bc48d04fd4c457c9175@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/18] incubator-ignite git commit: # ignite-41 X-Virus-Checked: Checked by ClamAV on apache.org # ignite-41 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/688a2e71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/688a2e71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/688a2e71 Branch: refs/heads/ignite-41 Commit: 688a2e71509b9b3cebd148045d7388386e5ce0fb Parents: e85a938 Author: sboikov Authored: Tue Dec 16 13:06:26 2014 +0300 Committer: sboikov Committed: Tue Dec 16 18:11:24 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 3 +- .../processors/cache/GridCacheIoManager.java | 2 + .../processors/cache/GridCacheMapEntry.java | 31 +- .../cache/GridCacheTxLocalAdapter.java | 58 +++- .../cache/GridCacheUpdateAtomicResult.java | 2 +- .../distributed/GridCacheExpiryPolicy.java | 74 ++++- .../GridDistributedTxRemoteAdapter.java | 2 + .../dht/atomic/GridDhtAtomicCache.java | 79 +++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 30 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 172 ++++++---- .../atomic/GridNearAtomicUpdateResponse.java | 111 ++++++- .../distributed/near/GridNearAtomicCache.java | 35 ++- .../expiry/IgniteCacheExpiryPolicyTest.java | 310 +++++++++++++++++-- 13 files changed, 749 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index f1f4436..39b7338 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -2043,8 +2043,7 @@ public abstract class GridCacheAdapter extends GridMetadataAwareAdapter im return tx.putAllAsync(ctx, F.t(key, val), true, cached, ttl, filter).get().value(); } - @Override - public String toString() { + @Override public String toString() { return "put [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; } })); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java index a222c32..a50e461 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java @@ -200,6 +200,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter implements GridCacheEntryEx */ protected GridCacheMapEntry(GridCacheContext cctx, K key, int hash, V val, GridCacheMapEntry next, long ttl, int hdrId) { - log = U.logger(cctx.kernalContext(), logRef, this); + log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class); if (cctx.portableEnabled()) key = (K)cctx.kernalContext().portable().detachPortable(key); @@ -1112,6 +1112,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx @Nullable UUID subjId, String taskName ) throws IgniteCheckedException, GridCacheEntryRemovedException { + log.info("Inner set " + key + " " + val + " " + ttl); + V old; boolean valid = valid(tx != null ? tx.topologyVersion() : topVer); @@ -1630,7 +1632,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx return toTtl(duration); } - private static long toTtl(Duration duration) { + public static long toTtl(Duration duration) { if (duration == null) return -1; @@ -1685,7 +1687,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx GridDrResolveResult drRes = null; - long newTtl = 0L; + long newTtl = -1L; long newExpireTime = 0L; long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node. @@ -1869,12 +1871,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx } } + long ttl0 = newTtl; + if (drRes == null) { // Calculate TTL and expire time for local update. if (drTtl >= 0L) { assert drExpireTime >= 0L; - newTtl = drTtl; + ttl0 = drTtl; newExpireTime = drExpireTime; } else { @@ -1902,10 +1906,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx else newTtl = -1L; - if (newTtl < 0) - newTtl = ttlExtras(); + ttl0 = newTtl < 0 ? ttlExtras() : newTtl; - newExpireTime = toExpireTime(newTtl); + newExpireTime = toExpireTime(ttl0); } } @@ -1937,7 +1940,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx // in load methods without actually holding entry lock. updateIndex(updated, valBytes, newExpireTime, newVer, old); - update(updated, valBytes, newExpireTime, newTtl, newVer); + update(updated, valBytes, newExpireTime, ttl0, newVer); drReplicate(drType, updated, valBytes, newVer); @@ -2048,7 +2051,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx res = hadVal; // Do not propagate zeroed TTL and expire time. - newTtl = 0L; + newTtl = -1L; newDrExpireTime = -1L; } @@ -2500,7 +2503,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx * @param ttl Time to live. * @return Expiration time. */ - protected long toExpireTime(long ttl) { + public static long toExpireTime(long ttl) { long expireTime = ttl == 0 ? 0 : U.currentTimeMillis() + ttl; // Account for overflow. @@ -2953,14 +2956,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx } /** {@inheritDoc} */ - /* - @Override public synchronized GridDrEntry drEntry() throws IgniteCheckedException { - return new GridDrPlainEntry<>(key, isStartVersion() ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false), - ttlExtras(), expireTimeExtras(), ver.drVersion()); - } - */ - - /** {@inheritDoc} */ @Override public synchronized V rawPut(V val, long ttl) { V old = this.val; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java index abb9fef..c6888f1 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java @@ -25,6 +25,7 @@ import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; +import javax.cache.expiry.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -585,6 +586,8 @@ public abstract class GridCacheTxLocalAdapter extends GridCacheTxAdapter extends GridCacheTxAdapter= 0L) ttl = drTtl; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java index 048df15..43ca819 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUpdateAtomicResult.java @@ -98,7 +98,7 @@ public class GridCacheUpdateAtomicResult { } /** - * @return New TTL. + * @return {@code -1} if TTL did not change, otherwise new TTL. */ public long newTtl() { return newTtl; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java index f7fe27a..3a77884 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheExpiryPolicy.java @@ -9,6 +9,9 @@ package org.gridgain.grid.kernal.processors.cache.distributed; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + import javax.cache.expiry.*; import java.io.*; import java.util.concurrent.*; @@ -27,11 +30,24 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable { private static final byte UPDATE_TTL_MASK = 0x02; /** */ + private static final byte ACCESS_TTL_MASK = 0x04; + + /** */ private Duration forCreate; /** */ private Duration forUpdate; + /** */ + private Duration forAccess; + + /** + * Required by {@link Externalizable}. + */ + public GridCacheExpiryPolicy() { + // No-op. + } + /** * @param plc Expiry policy. */ @@ -48,9 +64,7 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable { /** {@inheritDoc} */ @Override public Duration getExpiryForAccess() { - assert false; - - return null; + return forAccess; } /** {@inheritDoc} */ @@ -58,6 +72,38 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable { return forUpdate; } + /** + * @param out Output stream. + * @param duration Duration. + * @throws IOException + */ + private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { + if (duration != null) { + if (duration.isEternal()) + out.writeLong(0); + else if (duration.getDurationAmount() == 0) + out.writeLong(1); + else + out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); + } + } + + /** + * @param in Input stream. + * @return Duration. + * @throws IOException + */ + private Duration readDuration(ObjectInput in) throws IOException { + long ttl = in.readLong(); + + assert ttl >= 0; + + if (ttl == 0) + return Duration.ETERNAL; + + return new Duration(TimeUnit.MILLISECONDS, ttl); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { byte flags = 0; @@ -72,7 +118,18 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable { if (update != null) flags |= UPDATE_TTL_MASK; + Duration access = plc.getExpiryForAccess(); + + if (access != null) + flags |= ACCESS_TTL_MASK; + out.writeByte(flags); + + writeDuration(out, create); + + writeDuration(out, update); + + writeDuration(out, access); } /** {@inheritDoc} */ @@ -80,9 +137,16 @@ public class GridCacheExpiryPolicy implements ExpiryPolicy, Externalizable { byte flags = in.readByte(); if ((flags & CREATE_TTL_MASK) != 0) - forCreate = new Duration(TimeUnit.MILLISECONDS, in.readLong()); + forCreate = readDuration(in); if ((flags & UPDATE_TTL_MASK) != 0) - forUpdate = new Duration(TimeUnit.MILLISECONDS, in.readLong()); + forUpdate = readDuration(in); + + if ((flags & ACCESS_TTL_MASK) != 0) + forAccess = readDuration(in); + } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheExpiryPolicy.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 3cd3e2d..3b34552 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -446,6 +446,8 @@ public class GridDistributedTxRemoteAdapter extends GridCacheTxAdapter txEntry : writeMap.values()) { assert txEntry != null : "Missing transaction entry for tx: " + this; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index fd2d98d..64c95c3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1355,6 +1355,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { req.subjectId(), taskName); + assert updRes.newTtl() == -1L || (expiry != null || updRes.drExpireTime() >= 0); + if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -1366,7 +1368,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { GridDrResolveResult ctx = updRes.drResolveResult(); long ttl = updRes.newTtl(); - long drExpireTime = updRes.drExpireTime(); + long expireTime = updRes.drExpireTime(); if (ctx == null) newDrVer = null; @@ -1380,19 +1382,24 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (req.forceTransformBackups() && op == TRANSFORM) transformC = (IgniteClosure)writeVal; - if (!readersOnly) + if (!readersOnly) { dhtFut.addWriteEntry(entry, updRes.newValue(), newValBytes, transformC, - drExpireTime >= 0L ? ttl : -1L, - drExpireTime, - newDrVer, - drExpireTime < 0L ? req.expiry() : null); + updRes.newTtl(), + expireTime, + newDrVer); + } if (!F.isEmpty(filteredReaders)) - dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), newValBytes, - transformC, drExpireTime < 0L ? req.expiry() : null); + dhtFut.addNearWriteEntries(filteredReaders, + entry, + updRes.newValue(), + newValBytes, + transformC, + ttl, + expireTime); } else { // TODO IGNITE-41 ttl could be changed. @@ -1408,14 +1415,21 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { GridDrResolveResult ctx = updRes.drResolveResult(); - // TODO IGNITE-41 dr ttl for near cache. + long ttl = updRes.newTtl(); + long expireTime = updRes.drExpireTime(); if (ctx != null && ctx.isMerge()) newValBytes = null; // If put the same value as in request then do not need to send it back. if (op == TRANSFORM || writeVal != updRes.newValue()) - res.addNearValue(i, updRes.newValue(), newValBytes); + res.addNearValue(i, + updRes.newValue(), + newValBytes, + ttl, + expireTime); + else + res.addNearTtl(i, ttl, expireTime); if (updRes.newValue() != null || newValBytes != null) { IgniteFuture f = entry.addReader(node.id(), req.messageId(), topVer); @@ -1596,6 +1610,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { req.subjectId(), taskName); + assert updRes.newTtl() == -1L || expiry != null; + if (intercept) { if (op == UPDATE) ctx.config().getInterceptor().onAfterPut(entry.key(), updRes.newValue()); @@ -1624,25 +1640,42 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { IgniteClosure transformC = transformMap == null ? null : transformMap.get(entry.key()); if (!batchRes.readersOnly()) - dhtFut.addWriteEntry(entry, writeVal, valBytes, transformC, -1, -1, null, req.expiry()); + dhtFut.addWriteEntry(entry, + writeVal, + valBytes, + transformC, + updRes.newTtl(), + -1, + null); if (!F.isEmpty(filteredReaders)) - dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, valBytes, transformC, - req.expiry()); + dhtFut.addNearWriteEntries(filteredReaders, + entry, + writeVal, + valBytes, + transformC, + updRes.newTtl(), + -1); } if (hasNear) { if (primary) { if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { - if (req.operation() == TRANSFORM) { - int idx = firstEntryIdx + i; + int idx = firstEntryIdx + i; + if (req.operation() == TRANSFORM) { GridCacheValueBytes valBytesTuple = entry.valueBytes(); byte[] valBytes = valBytesTuple.getIfMarshaled(); - res.addNearValue(idx, writeVal, valBytes); + res.addNearValue(idx, + writeVal, + valBytes, + updRes.newTtl(), + -1); } + else + res.addNearTtl(idx, updRes.newTtl(), -1); if (writeVal != null || !entry.valueBytes().isNull()) { IgniteFuture f = entry.addReader(node.id(), req.messageId(), topVer); @@ -2037,8 +2070,6 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); - for (int i = 0; i < req.size(); i++) { K key = req.key(i); @@ -2058,6 +2089,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { UPDATE : DELETE; + long ttl = req.drTtl(i); + long expireTime = req.drExpireTime(i); + + if (ttl != -1L && expireTime == -1L) + expireTime = GridCacheMapEntry.toExpireTime(ttl); + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, @@ -2067,15 +2104,15 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { valBytes, /*write-through*/false, /*retval*/false, - expiry, + null, /*event*/true, /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), CU.empty(), replicate ? DR_BACKUP : DR_NONE, - req.drTtl(i), - req.drExpireTime(i), + ttl, + expireTime, req.drVersion(i), false, intercept, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 3c7da7b..25bc875 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -202,7 +202,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter * @param drTtl DR TTL (optional). * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). - * @param expiryPlc Expiry policy. */ public void addWriteEntry(GridDhtCacheEntry entry, @Nullable V val, @@ -210,8 +209,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter IgniteClosure transformC, long drTtl, long drExpireTime, - @Nullable GridCacheVersion drVer, - @Nullable ExpiryPolicy expiryPlc) { + @Nullable GridCacheVersion drVer) { long topVer = updateReq.topologyVersion(); Collection dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); @@ -237,7 +235,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter writeVer, syncMode, topVer, - expiryPlc, forceTransformBackups, this.updateReq.subjectId(), this.updateReq.taskNameHash()); @@ -245,8 +242,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter mappings.put(nodeId, updateReq); } - updateReq.addWriteValue(entry.key(), entry.keyBytes(), val, valBytes, transformC, drTtl, - drExpireTime, drVer); + updateReq.addWriteValue(entry.key(), + entry.keyBytes(), + val, + valBytes, + transformC, + drTtl, + drExpireTime, + drVer); } } } @@ -256,14 +259,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter * @param entry Entry. * @param val Value. * @param valBytes Value bytes. - * @param expiryPlc Expiry policy.. + * @param TTL for near cache update (optional). + * @param expireTime Expire time for near cache update (optional). */ public void addNearWriteEntries(Iterable readers, GridDhtCacheEntry entry, @Nullable V val, @Nullable byte[] valBytes, IgniteClosure transformC, - @Nullable ExpiryPolicy expiryPlc) { + long ttl, + long expireTime) { GridCacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); keys.add(entry.key()); @@ -287,7 +292,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter writeVer, syncMode, topVer, - expiryPlc, forceTransformBackups, this.updateReq.subjectId(), this.updateReq.taskNameHash()); @@ -300,7 +304,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter nearReadersEntries.put(entry.key(), entry); - updateReq.addNearWriteValue(entry.key(), entry.keyBytes(), val, valBytes, transformC); + updateReq.addNearWriteValue(entry.key(), + entry.keyBytes(), + val, + valBytes, + transformC, + ttl, + expireTime); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index c3b0918..fda44c9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -77,14 +77,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp /** DR TTLs. */ private GridLongList drExpireTimes; - /** Write synchronization mode. */ - private GridCacheWriteSynchronizationMode syncMode; + /** Near TTLs. */ + private GridLongList nearTtls; - /** Expiry policy. */ - private ExpiryPolicy expiryPlc; + /** Near expire times. */ + private GridLongList nearExpireTimes; - /** Expiry policy bytes. */ - private byte[] expiryPlcBytes; + /** Write synchronization mode. */ + private GridCacheWriteSynchronizationMode syncMode; /** Keys to update. */ @GridToStringInclude @@ -154,7 +154,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp * @param writeVer Write version for cache values. * @param syncMode Cache write synchronization mode. * @param topVer Topology version. - * @param expiryPlc Expiry policy. * @param forceTransformBackups Force transform backups flag. * @param subjId Subject ID. */ @@ -165,7 +164,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp GridCacheVersion writeVer, GridCacheWriteSynchronizationMode syncMode, long topVer, - @Nullable ExpiryPolicy expiryPlc, boolean forceTransformBackups, UUID subjId, int taskNameHash @@ -175,7 +173,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp this.futVer = futVer; this.writeVer = writeVer; this.syncMode = syncMode; - this.expiryPlc = expiryPlc; this.topVer = topVer; this.forceTransformBackups = forceTransformBackups; this.subjId = subjId; @@ -210,8 +207,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp * @param drExpireTime DR expire time (optional). * @param drVer DR version (optional). */ - public void addWriteValue(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - IgniteClosure transformC, long drTtl, long drExpireTime, @Nullable GridCacheVersion drVer) { + public void addWriteValue(K key, + @Nullable byte[] keyBytes, + @Nullable V val, + @Nullable byte[] valBytes, + IgniteClosure transformC, + long drTtl, + long drExpireTime, + @Nullable GridCacheVersion drVer) { keys.add(key); this.keyBytes.add(keyBytes); @@ -265,8 +268,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp * @param val Value, {@code null} if should be removed. * @param valBytes Value bytes, {@code null} if should be removed. */ - public void addNearWriteValue(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - IgniteClosure transformC) { + public void addNearWriteValue(K key, + @Nullable byte[] keyBytes, + @Nullable V val, + @Nullable byte[] valBytes, + IgniteClosure transformC, + long ttl, + long expireTime) { if (nearKeys == null) { nearKeys = new ArrayList<>(); nearKeyBytes = new ArrayList<>(); @@ -293,6 +301,28 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp nearVals.add(val); nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null); } + + if (ttl >= 0) { + if (nearTtls == null) { + nearTtls = new GridLongList(nearKeys.size()); + + for (int i = 0; i < nearKeys.size() - 1; i++) + nearTtls.add(-1); + } + + nearTtls.add(ttl); + } + + if (expireTime >= 0) { + if (nearExpireTimes == null) { + nearExpireTimes = new GridLongList(nearKeys.size()); + + for (int i = 0; i < nearKeys.size() - 1; i++) + nearExpireTimes.add(-1); + } + + nearExpireTimes.add(expireTime); + } } /** {@inheritDoc} */ @@ -364,13 +394,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp } /** - * @return Expiry policy. - */ - @Nullable public ExpiryPolicy expiry() { - return expiryPlc; - } - - /** * @return Keys. */ public Collection keys() { @@ -542,6 +565,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp } /** + * @param idx Index. + * @return TTL for near cache update. + */ + public long nearTtl(int idx) { + if (nearTtls != null) { + assert idx >= 0 && idx < nearTtls.size(); + + return nearTtls.get(idx); + } + + return -1L; + } + + /** * @return DR TTLs. */ @Nullable public GridLongList drExpireTimes() { @@ -562,6 +599,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp return -1L; } + /** + * @param idx Index. + * @return Expire time for near cache update. + */ + public long nearExpireTime(int idx) { + if (nearExpireTimes != null) { + assert idx >= 0 && idx < nearExpireTimes.size(); + + return nearExpireTimes.get(idx); + } + + return -1L; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { @@ -625,7 +676,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp _clone.drTtls = drTtls; _clone.drExpireTimes = drExpireTimes; _clone.syncMode = syncMode; - _clone.expiryPlc = expiryPlc; _clone.nearKeys = nearKeys; _clone.nearKeyBytes = nearKeyBytes; _clone.nearVals = nearVals; @@ -635,6 +685,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp _clone.transformClosBytes = transformClosBytes; _clone.nearTransformClos = nearTransformClos; _clone.nearTransformClosBytes = nearTransformClosBytes; + _clone.nearExpireTimes = nearExpireTimes; + _clone.nearTtls = nearTtls; _clone.subjId = subjId; _clone.taskNameHash = taskNameHash; } @@ -746,12 +798,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; case 11: - if (!commState.putByteArray(expiryPlcBytes)) - return false; - - commState.idx++; - - case 12: if (valBytes != null) { if (commState.it == null) { if (!commState.putInt(valBytes.size())) @@ -778,13 +824,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 13: + case 12: if (!commState.putCacheVersion(writeVer)) return false; commState.idx++; - case 14: + case 13: if (nearKeyBytes != null) { if (commState.it == null) { if (!commState.putInt(nearKeyBytes.size())) @@ -811,7 +857,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 15: + case 14: if (nearValBytes != null) { if (commState.it == null) { if (!commState.putInt(nearValBytes.size())) @@ -838,13 +884,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 16: + case 15: if (!commState.putBoolean(forceTransformBackups)) return false; commState.idx++; - case 17: + case 16: if (nearTransformClosBytes != null) { if (commState.it == null) { if (!commState.putInt(nearTransformClosBytes.size())) @@ -871,7 +917,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 18: + case 17: if (transformClosBytes != null) { if (commState.it == null) { if (!commState.putInt(transformClosBytes.size())) @@ -898,18 +944,29 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 19: + case 18: if (!commState.putUuid(subjId)) return false; commState.idx++; - case 20: + case 19: if (!commState.putInt(taskNameHash)) return false; commState.idx++; + case 20: + if (!commState.putLongList(nearExpireTimes)) + return false; + + commState.idx++; + + case 21: + if (!commState.putLongList(nearTtls)) + return false; + + commState.idx++; } return true; @@ -1041,16 +1098,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; case 11: - byte[] expiryPlcBytes0 = commState.getByteArray(); - - if (expiryPlcBytes0 == BYTE_ARR_NOT_READ) - return false; - - expiryPlcBytes = expiryPlcBytes0; - - commState.idx++; - - case 12: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -1079,7 +1126,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 13: + case 12: GridCacheVersion writeVer0 = commState.getCacheVersion(); if (writeVer0 == CACHE_VER_NOT_READ) @@ -1089,7 +1136,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 14: + case 13: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -1118,7 +1165,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 15: + case 14: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -1147,7 +1194,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 16: + case 15: if (buf.remaining() < 1) return false; @@ -1155,7 +1202,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 17: + case 16: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -1184,7 +1231,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 18: + case 17: if (commState.readSize == -1) { if (buf.remaining() < 4) return false; @@ -1213,7 +1260,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 19: + case 18: UUID subjId0 = commState.getUuid(); if (subjId0 == UUID_NOT_READ) @@ -1223,7 +1270,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; - case 20: + case 19: if (buf.remaining() < 4) return false; @@ -1231,6 +1278,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage imp commState.idx++; + case 20: + GridLongList nearExpireTimes0 = commState.getLongList(); + + if (nearExpireTimes0 == LONG_LIST_NOT_READ) + return false; + + nearExpireTimes = nearExpireTimes0; + + commState.idx++; + + case 21: + GridLongList nearTtls0 = commState.getLongList(); + + if (nearTtls0 == LONG_LIST_NOT_READ) + return false; + + nearTtls = nearTtls0; + + commState.idx++; } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index fd4d7dc..0be44aa 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.util.*; import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.internal.*; @@ -94,6 +95,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage i @GridDirectVersion(1) private GridCacheVersion nearVer; + /** Near TTLs. */ + private GridLongList nearTtls; + + /** Near expire times. */ + private GridLongList nearExpireTimes; + /** * Empty constructor required by {@link Externalizable}. */ @@ -186,20 +193,87 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage i * @param keyIdx Key index. * @param val Value. * @param valBytes Value bytes. + * @param ttl TTL for near cache update. + * @param expireTime Expire time for near cache update. */ - public void addNearValue(int keyIdx, @Nullable V val, @Nullable byte[] valBytes) { + public void addNearValue(int keyIdx, + @Nullable V val, + @Nullable byte[] valBytes, + long ttl, + long expireTime) { if (nearValsIdxs == null) { nearValsIdxs = new ArrayList<>(); nearValBytes = new ArrayList<>(); nearVals = new ArrayList<>(); } + addNearTtl(keyIdx, ttl, expireTime); + nearValsIdxs.add(keyIdx); nearVals.add(val); nearValBytes.add(valBytes != null ? GridCacheValueBytes.marshaled(valBytes) : null); } /** + * @param ttl TTL for near cache update. + * @param expireTime Expire time for near cache update. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + public void addNearTtl(int keyIdx, long ttl, long expireTime) { + if (ttl >= 0) { + if (nearTtls == null) { + nearTtls = new GridLongList(16); + + for (int i = 0; i < keyIdx; i++) + nearTtls.add(-1L); + } + } + + if (nearTtls != null) + nearTtls.add(ttl); + + if (expireTime >= 0) { + if (nearExpireTimes == null) { + nearExpireTimes = new GridLongList(16); + + for (int i = 0; i < keyIdx; i++) + nearExpireTimes.add(-1); + } + } + + if (nearExpireTimes != null) + nearExpireTimes.add(expireTime); + } + + /** + * @param idx Index. + * @return Expire time for near cache update. + */ + public long nearExpireTime(int idx) { + if (nearExpireTimes != null) { + assert idx >= 0 && idx < nearExpireTimes.size(); + + return nearExpireTimes.get(idx); + } + + return -1L; + } + + /** + * @param idx Index. + * @return TTL for near cache update. + */ + public long nearTtl(int idx) { + if (nearTtls != null) { + assert idx >= 0 && idx < nearTtls.size(); + + return nearTtls.get(idx); + } + + return -1L; + } + + /** * @param nearVer Version generated on primary node to be used for originating node's near cache update. */ public void nearVersion(GridCacheVersion nearVer) { @@ -221,6 +295,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage i nearSkipIdxs = new ArrayList<>(); nearSkipIdxs.add(keyIdx); + + addNearTtl(keyIdx, -1L, -1L); } /** @@ -366,6 +442,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage i _clone.nearVals = nearVals; _clone.nearValBytes = nearValBytes; _clone.nearVer = nearVer; + _clone.nearTtls = nearTtls; + _clone.nearExpireTimes = nearExpireTimes; } /** {@inheritDoc} */ @@ -501,6 +579,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage i commState.idx++; + case 12: + if (!commState.putLongList(nearExpireTimes)) + return false; + + commState.idx++; + + case 13: + if (!commState.putLongList(nearTtls)) + return false; + + commState.idx++; } return true; @@ -662,6 +751,26 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage i commState.idx++; + case 12: + GridLongList nearExpireTimes0 = commState.getLongList(); + + if (nearExpireTimes0 == LONG_LIST_NOT_READ) + return false; + + nearExpireTimes = nearExpireTimes0; + + commState.idx++; + + case 13: + GridLongList nearTtls0 = commState.getLongList(); + + if (nearTtls0 == LONG_LIST_NOT_READ) + return false; + + nearTtls = nearTtls0; + + commState.idx++; + } return true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java index 2aa32c3..1da6626 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -148,12 +148,19 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { } } + long ttl = res.nearTtl(i); + long expireTime = res.nearExpireTime(i); + + if (ttl != -1L && expireTime == -1L) + expireTime = GridCacheMapEntry.toExpireTime(ttl); + try { processNearAtomicUpdateResponse(ver, key, val, valBytes, - req.expiry(), + ttl, + expireTime, req.nodeId(), req.subjectId(), taskName); @@ -169,7 +176,8 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { * @param key Key. * @param val Value. * @param valBytes Value bytes. - * @param expiryPlc Expiry policy. + * @param ttl TTL. + * @param expireTime Expire time. * @param nodeId Node ID. * @throws IgniteCheckedException If failed. */ @@ -178,7 +186,8 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { K key, @Nullable V val, @Nullable byte[] valBytes, - ExpiryPolicy expiryPlc, + long ttl, + long expireTime, UUID nodeId, UUID subjId, String taskName @@ -203,15 +212,15 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { valBytes, /*write-through*/false, /*retval*/false, - expiryPlc != null ? expiryPlc : ctx.expiry(), + null, /*event*/true, /*metrics*/true, /*primary*/false, /*check version*/true, CU.empty(), DR_NONE, - -1, - -1, + ttl, + expireTime, null, false, false, @@ -260,8 +269,6 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - ExpiryPolicy expiry = req.expiry() != null ? req.expiry() : ctx.expiry(); - for (int i = 0; i < req.nearSize(); i++) { K key = req.nearKey(i); @@ -292,6 +299,12 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { UPDATE : DELETE; + long ttl = req.nearTtl(i); + long expireTime = req.nearExpireTime(i); + + if (ttl != -1L && expireTime == -1L) + expireTime = GridCacheMapEntry.toExpireTime(ttl); + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, @@ -301,15 +314,15 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { valBytes, /*write-through*/false, /*retval*/false, - expiry, + null, /*event*/true, /*metrics*/true, /*primary*/false, /*check version*/!req.forceTransformBackups(), CU.empty(), DR_NONE, - -1, - -1, + ttl, + expireTime, null, false, intercept, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/688a2e71/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java index 0d22f62..f96e5a7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTest.java @@ -36,6 +36,9 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { /** */ private Factory factory; + /** */ + private boolean nearCache; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { // No-op. @@ -48,7 +51,160 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { /** {@inheritDoc} */ @Override protected int gridCount() { - return 2; + return 3; + } + + public void testPrimary() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + nearCache = false; + + boolean inTx = false; + + startGrids(); + + IgniteCache cache = jcache(0); + + GridCache cache0 = cache(0); + + Integer key = primaryKey(cache0); + + log.info("Create: " + key); + + GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; + + cache.put(key, 1); + + if (tx != null) + tx.commit(); + + checkTtl(key, 60_000); + + tx = inTx ? grid(0).transactions().txStart() : null; + + log.info("Update: " + key); + + cache.put(key, 2); + + if (tx != null) + tx.commit(); + + checkTtl(key, 61_000); + } + + public void testBackup() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + nearCache = false; + + boolean inTx = false; + + startGrids(); + + IgniteCache cache = jcache(0); + + GridCache cache0 = cache(0); + + Integer key = backupKey(cache0); + + log.info("Create: " + key); + + GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; + + cache.put(key, 1); + + if (tx != null) + tx.commit(); + + checkTtl(key, 60_000); + + tx = inTx ? grid(0).transactions().txStart() : null; + + log.info("Update: " + key); + + cache.put(key, 2); + + if (tx != null) + tx.commit(); + + checkTtl(key, 61_000); + } + + public void testNear() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, 61_000L, null)); + + nearCache = false; + + boolean inTx = true; + + startGrids(); + + IgniteCache cache = jcache(0); + + GridCache cache0 = cache(0); + + Integer key = nearKey(cache0); + + log.info("Create: " + key); + + GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; + + cache.put(key, 1); + + if (tx != null) + tx.commit(); + + checkTtl(key, 60_000); + + tx = inTx ? grid(0).transactions().txStart() : null; + + log.info("Update: " + key); + + cache.put(key, 2); + + if (tx != null) + tx.commit(); + + checkTtl(key, 61_000); + } + + /** + * @throws Exception If failed. + */ + public void test1() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); + + nearCache = false; + + boolean inTx = true; + + startGrids(); + + Collection keys = keys(); + + IgniteCache cache = jcache(0); + + for (final Integer key : keys) { + log.info("Test key1: " + key); + + GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; + + cache.put(key, 1); + + if (tx != null) + tx.commit(); + } + + for (final Integer key : keys) { + log.info("Test key2: " + key); + + GridCacheTx tx = inTx ? grid(0).transactions().txStart() : null; + + cache.put(key, 2); + + if (tx != null) + tx.commit(); + } } /** @@ -93,6 +249,94 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { } /** + * @throws Exception If failed. + */ + public void testNearPut() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); + + nearCache = true; + + startGrids(); + + GridCache cache0 = cache(0); + + Integer key = nearKey(cache0); + + IgniteCache jcache0 = jcache(0); + + jcache0.put(key, 1); + + checkTtl(key, 60_000); + + IgniteCache jcache1 = jcache(1); + + // Update from another node with provided TTL. + jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).put(key, 2); + + checkTtl(key, 1000); + + waitExpired(key); + + jcache1.remove(key); + + jcache0.put(key, 1); + + checkTtl(key, 60_000); + + // Update from near node with provided TTL. + jcache0.withExpiryPolicy(new TestPolicy(null, 1100L, null)).put(key, 2); + + checkTtl(key, 1100); + + waitExpired(key); + } + + /** + * @throws Exception If failed. + */ + public void testNearPutAll() throws Exception { + factory = new FactoryBuilder.SingletonFactory<>(new TestPolicy(60_000L, null, null)); + + nearCache = true; + + startGrids(); + + Map vals = new HashMap<>(); + + for (int i = 0; i < 1000; i++) + vals.put(i, i); + + IgniteCache jcache0 = jcache(0); + + jcache0.putAll(vals); + + for (Integer key : vals.keySet()) + checkTtl(key, 60_000); + + IgniteCache jcache1 = jcache(1); + + // Update from another node with provided TTL. + jcache1.withExpiryPolicy(new TestPolicy(null, 1000L, null)).putAll(vals); + + for (Integer key : vals.keySet()) + checkTtl(key, 1000); + + waitExpired(vals.keySet()); + + jcache0.removeAll(vals.keySet()); + + jcache0.putAll(vals); + + // Update from near node with provided TTL. + jcache1.withExpiryPolicy(new TestPolicy(null, 1101L, null)).putAll(vals); + + for (Integer key : vals.keySet()) + checkTtl(key, 1101L); + + waitExpired(vals.keySet()); + } + + /** * @return Test keys. * @throws Exception If failed. */ @@ -106,7 +350,7 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { if (gridCount() > 1) { keys.add(backupKey(cache)); - if (cache.configuration().getDistributionMode() == NEAR_PARTITIONED) + if (cache.configuration().getCacheMode() != REPLICATED) keys.add(nearKey(cache)); } @@ -117,16 +361,27 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { * @param key Key. * @throws Exception If failed. */ - private void waitExpired(final Integer key) throws Exception { + private void waitExpired(Integer key) throws Exception { + waitExpired(Collections.singleton(key)); + } + + /** + * @param keys Keys. + * @throws Exception If failed. + */ + private void waitExpired(final Collection keys) throws Exception { GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { for (int i = 0; i < gridCount(); i++) { - Object val = jcache(i).localPeek(key); + for (Integer key : keys) { + Object val = jcache(i).localPeek(key); - log.info("Value [grid=" + i + ", val=" + val + ']'); + if (val != null) { + // log.info("Value [grid=" + i + ", val=" + val + ']'); - if (val != null) - return false; + return false; + } + } } return false; @@ -138,17 +393,23 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { for (int i = 0; i < gridCount(); i++) { ClusterNode node = grid(i).cluster().localNode(); - Object val = jcache(i).localPeek(key); + for (Integer key : keys) { + Object val = jcache(i).localPeek(key); - log.info("Value [grid=" + i + - ", primary=" + cache.affinity().isPrimary(node, key) + - ", backup=" + cache.affinity().isBackup(node, key) + ']'); + if (val != null) { + log.info("Unexpected value [grid=" + i + + ", primary=" + cache.affinity().isPrimary(node, key) + + ", backup=" + cache.affinity().isBackup(node, key) + ']'); + } - assertNull("Unexpected non-null value for grid " + i, val); + assertNull("Unexpected non-null value for grid " + i, val); + } } - for (int i = 0; i < gridCount(); i++) - assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key)); + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + assertNull("Unexpected non-null value for grid " + i, jcache(i).get(key)); + } } /** @@ -167,11 +428,8 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { if (e == null && cache.context().isNear()) e = cache.context().near().dht().peekEx(key); - if (e == null) { - assertTrue(i > 0); - + if (e == null) assertTrue(!cache.affinity().isPrimaryOrBackup(grid.localNode(), key)); - } else assertEquals("Unexpected ttl for grid " + i, ttl, e.ttl()); } @@ -184,10 +442,16 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { GridCacheConfiguration cfg = super.cacheConfiguration(gridName); cfg.setCacheMode(PARTITIONED); - cfg.setAtomicityMode(ATOMIC); + cfg.setAtomicityMode(TRANSACTIONAL); + + //cfg.setAtomicityMode(ATOMIC); + cfg.setBackups(1); - cfg.setDistributionMode(PARTITIONED_ONLY); + if (nearCache && gridName.equals(getTestGridName(0))) + cfg.setDistributionMode(NEAR_PARTITIONED); + else + cfg.setDistributionMode(PARTITIONED_ONLY); cfg.setExpiryPolicyFactory(factory); @@ -213,11 +477,11 @@ public class IgniteCacheExpiryPolicyTest extends IgniteCacheTest { * @param update TTL for update. */ TestPolicy(@Nullable Long create, - @Nullable Long access, - @Nullable Long update) { + @Nullable Long update, + @Nullable Long access) { this.create = create; - this.access = access; this.update = update; + this.access = access; } /** {@inheritDoc} */