Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 236452009F9 for ; Mon, 23 May 2016 18:43:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2202C160A05; Mon, 23 May 2016 16:43:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C7BA5160A24 for ; Mon, 23 May 2016 18:43:14 +0200 (CEST) Received: (qmail 91995 invoked by uid 500); 23 May 2016 16:43:13 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 91745 invoked by uid 99); 23 May 2016 16:43:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 May 2016 16:43:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F07EDFF07; Mon, 23 May 2016 16:43:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 23 May 2016 16:43:14 -0000 Message-Id: <9124468ca0714613a12cdd57bf1e6004@git.apache.org> In-Reply-To: <3f0157b5dfbf4af58040f60a3c801037@git.apache.org> References: <3f0157b5dfbf4af58040f60a3c801037@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] ignite git commit: ignite-2899 Fixed issues with 'keepBinary' handling. This closes #593. archived-at: Mon, 23 May 2016 16:43:17 -0000 ignite-2899 Fixed issues with 'keepBinary' handling. This closes #593. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee7e2c7d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee7e2c7d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee7e2c7d Branch: refs/heads/master Commit: ee7e2c7d55a98f496e82ccd544090297b9e33e38 Parents: e96fd87 Author: sboikov Authored: Mon May 23 19:42:45 2016 +0300 Committer: sboikov Committed: Mon May 23 19:42:45 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 30 +- .../processors/cache/GridCacheMapEntry.java | 73 +- .../processors/cache/GridCacheReturn.java | 10 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 15 +- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../local/atomic/GridLocalAtomicCache.java | 2 +- .../cache/transactions/IgniteTxEntry.java | 3 +- .../transactions/IgniteTxLocalAdapter.java | 4 +- ...GetEntryOptimisticReadCommittedSeltTest.java | 2 +- .../IgniteCacheConfigVariationsFullApiTest.java | 50 +- ...rceptorCacheConfigVariationsFullApiTest.java | 118 ++ ...terceptorWithKeepBinaryCacheFullApiTest.java | 124 ++ .../cache/WithKeepBinaryCacheFullApiTest.java | 1234 ++++++++++++++++++ ...gniteComputeConfigVariationsFullApiTest.java | 12 +- .../ConfigVariationsTestSuiteBuilder.java | 9 + ...IgniteCacheConfigVariationsAbstractTest.java | 16 +- .../IgniteConfigVariationsAbstractTest.java | 160 ++- ...orCacheConfigVariationsFullApiTestSuite.java | 41 + ...ryCacheConfigVariationsFullApiTestSuite.java | 71 + 20 files changed, 1873 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9ea688d..b68a8ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1407,9 +1407,16 @@ public abstract class GridCacheAdapter implements IgniteInternalCache t = (T2)get(key, !ctx.keepBinary(), true); - CacheEntry val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null; + CacheEntry val = t != null ? new CacheEntryImplEx<>( + ctx.keepBinary() ? (K)ctx.unwrapBinaryIfNeeded(key, true, false) : key, + t.get1(), + t.get2()) + : null; if (ctx.config().getInterceptor() != null) { V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); @@ -1454,8 +1461,10 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> fut = - (IgniteInternalFuture>)getAsync(key, !ctx.keepBinary(), true); + (IgniteInternalFuture>)getAsync(key0, !ctx.keepBinary(), true); final boolean intercept = ctx.config().getInterceptor() != null; @@ -1465,16 +1474,21 @@ public abstract class GridCacheAdapter implements IgniteInternalCache t = f.get(); - CacheEntry val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null; + CacheEntry val = t != null ? new CacheEntryImplEx<>( + ctx.keepBinary() ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false) : key0, + t.get1(), + t.get2()) + : null; + if (intercept) { - V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); + V val0 = (V)ctx.config().getInterceptor().onGet(key0, t != null ? val.getValue() : null); - return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null); + return val0 != null ? new CacheEntryImplEx(key0, val0, t != null ? t.get2() : null) : null; } else return val; - } - }); + } + }); if (statsEnabled) fut.listen(new UpdateGetTimeStatClosure>(metrics0(), start)); @@ -1720,7 +1734,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache(false, cctx.unwrapTemporary(value(old0, old, false)), invokeRes); + return new GridTuple3<>(false, cctx.unwrapTemporary(value(old0, old, keepBinary, false)), invokeRes); else { updated0 = cctx.unwrapTemporary(interceptorVal); @@ -1729,6 +1729,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (ttl == CU.TTL_ZERO) op = GridCacheOperation.DELETE; + boolean hasValPtr = false; + // Try write-through. if (op == GridCacheOperation.UPDATE) { // Detach value before index update. @@ -1772,7 +1774,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, key); - boolean hasValPtr = hasOffHeapPointer(); + hasValPtr = hasOffHeapPointer(); if (old == null) old = saveValueForIndexUnlocked(); @@ -1783,12 +1785,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true); - if (cctx.offheapTiered() && hasValPtr) { - boolean rmv = cctx.swap().removeOffheap(key); - - assert rmv; - } - if (evt) { CacheObject evtOld = null; @@ -1838,6 +1834,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L)); } + + if (op != GridCacheOperation.UPDATE) { + if (cctx.offheapTiered() && hasValPtr) { + boolean rmv = cctx.swap().removeOffheap(key); + + assert rmv; + } + } } return new GridTuple3<>(res, @@ -2326,10 +2330,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme IgniteBiTuple interceptRes = null; + boolean hasValPtr = false; + // Actual update. if (op == GridCacheOperation.UPDATE) { if (intercept) { - updated0 = value(updated0, updated, false); + updated0 = value(updated0, updated, keepBinary, false); Object interceptorVal = cctx.config().getInterceptor() .onBeforePut(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary), updated0); @@ -2461,7 +2467,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme enqueueVer = newVer; - boolean hasValPtr = hasOffHeapPointer(); + hasValPtr = hasOffHeapPointer(); // Clear value on backup. Entry will be removed from cache when it got evicted from queue. update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true); @@ -2469,12 +2475,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert newSysTtl == CU.TTL_NOT_CHANGED; assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; - if (cctx.offheapTiered() && hasValPtr) { - boolean rmv = cctx.swap().removeOffheap(key); - - assert rmv; - } - clearReaders(); recordNodeId(affNodeId, topVer); @@ -2548,6 +2548,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (interceptRes != null) oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); } + + if (op != GridCacheOperation.UPDATE) { + if (cctx.offheapTiered() && hasValPtr) { + boolean rmv = cctx.swap().removeOffheap(key); + + assert rmv; + } + } } if (log.isDebugEnabled()) @@ -2568,14 +2576,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param val Value. * @param cacheObj Cache object. + * @param keepBinary Keep binary flag. * @param cpy Copy flag. * @return Cache object value. */ - @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean cpy) { + @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) { if (val != null) return val; - return cacheObj != null ? cacheObj.value(cctx.cacheObjectContext(), cpy) : null; + return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java index b3bce09..29e74db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Map; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -218,6 +219,7 @@ public class GridCacheReturn implements Externalizable, Message { * @param key0 Key value. * @param res Result. * @param err Error. + * @param keepBinary Keep binary. */ @SuppressWarnings("unchecked") public synchronized void addEntryProcessResult( @@ -225,7 +227,8 @@ public class GridCacheReturn implements Externalizable, Message { KeyCacheObject key, @Nullable Object key0, @Nullable Object res, - @Nullable Exception err) { + @Nullable Exception err, + boolean keepBinary) { assert v == null || v instanceof Map : v; assert key != null; assert res != null || err != null; @@ -243,7 +246,10 @@ public class GridCacheReturn implements Externalizable, Message { CacheInvokeResult res0 = err == null ? CacheInvokeResult.fromResult(res) : CacheInvokeResult.fromError(err); - resMap.put(key0 != null ? key0 : CU.value(key, cctx, true), res0); + Object resKey = key0 != null ? key0 : + ((keepBinary && key instanceof BinaryObject) ? key : CU.value(key, cctx, true)); + + resMap.put(resKey, res0); } else { assert v == null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 94031ac..9491d5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -432,7 +432,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture extends GridDhtCacheAdapter { if (invokeRes == null) invokeRes = new GridCacheReturn(node.isLocal()); - invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), computed, null); + computed = ctx.unwrapTemporary(computed); + + invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), computed, null, + req.keepBinary()); } if (!invokeEntry.modified()) @@ -1888,7 +1891,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (invokeRes == null) invokeRes = new GridCacheReturn(node.isLocal()); - invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), null, e); + invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), null, e, req.keepBinary()); updated = old; } @@ -2022,8 +2025,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { entry.key(), old, req.keepBinary()), - updated.value( - ctx.cacheObjectContext(), + ctx.unwrapBinaryIfNeeded( + updated, + req.keepBinary(), false)); if (val == null) @@ -2366,7 +2370,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { k, null, compRes.get1(), - compRes.get2()); + compRes.get2(), + req.keepBinary()); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 0ae434a..d0850e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -236,7 +236,7 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte return (V)(val); } - return (V)map.get(key); + return (V)F.firstValue(map); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index cb6152d..e1d12b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -412,7 +412,7 @@ public class GridLocalAtomicCache extends GridLocalCache { assert m.isEmpty() || m.size() == 1 : m.size(); - return m.get(key); + return F.firstValue(m); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 45a3fff..8d50e2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -40,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -943,7 +942,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * @param ver Entry version. */ public void entryReadVersion(GridCacheVersion ver) { - assert this.serReadVer == null; + assert this.serReadVer == null: "Wrong version [serReadVer=" + serReadVer + ", ver=" + ver + "]"; assert ver != null; this.serReadVer = ver; http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d492051..f9ef423 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2824,10 +2824,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } if (res != null) - ret.addEntryProcessResult(ctx, txEntry.key(), key0, res, null); + ret.addEntryProcessResult(ctx, txEntry.key(), key0, res, null, txEntry.keepBinary()); } catch (Exception e) { - ret.addEntryProcessResult(ctx, txEntry.key(), key0, null, e); + ret.addEntryProcessResult(ctx, txEntry.key(), key0, null, e, txEntry.keepBinary()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java index acc21df..c04612d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java @@ -33,4 +33,4 @@ public class CacheGetEntryOptimisticReadCommittedSeltTest extends CacheGetEntryA @Override protected TransactionIsolation isolation() { return TransactionIsolation.READ_COMMITTED; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index ded5a14..5e8f162 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@ -161,6 +161,9 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar } }; + /** */ + public static final int CNT = 20; + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return TEST_TIMEOUT; @@ -2474,31 +2477,50 @@ public class IgniteCacheConfigVariationsFullApiTest extends IgniteCacheConfigVar /** * @throws Exception If failed. */ + @SuppressWarnings("serial") public void testGetAndRemoveObject() throws Exception { - IgniteCache cache = ignite(0).cache(cacheName()); + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + IgniteCache cache = ignite(0).cache(cacheName()); - SerializableObject val1 = new SerializableObject(1); - SerializableObject val2 = new SerializableObject(2); + Map map = new HashMap() {{ + for (int i = 0; i < CNT; i++) + put("key" + i, value(i)); + }}; - cache.put("key1", val1); - cache.put("key2", val2); + for (Map.Entry e : map.entrySet()) { + final String key = e.getKey(); + final Object val = e.getValue(); - assert !cache.remove("key1", new SerializableObject(0)); + cache.put(key, val); - SerializableObject oldVal = cache.get("key1"); + assertFalse(cache.remove(key, new SerializableObject(-1))); - assert oldVal != null && F.eq(val1, oldVal); + Object oldVal = cache.get(key); - assert cache.remove("key1"); + assertNotNull(oldVal); + assertEquals(val, oldVal); - assert cache.get("key1") == null; + assertTrue(cache.remove(key)); - SerializableObject oldVal2 = cache.getAndRemove("key2"); + assertNull(cache.get(key)); + } - assert F.eq(val2, oldVal2); + for (Map.Entry e : map.entrySet()) { + final String key = e.getKey(); + final Object val = e.getValue(); - assert cache.get("key2") == null; - assert cache.getAndRemove("key2") == null; + cache.put(key, val); + + Object oldVal = cache.getAndRemove(key); + + assertEquals(val, oldVal); + + assertNull(cache.get(key)); + assertNull(cache.getAndRemove(key)); + } + } + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorCacheConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorCacheConfigVariationsFullApiTest.java new file mode 100644 index 0000000..b6f5333 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorCacheConfigVariationsFullApiTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import javax.cache.Cache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheInterceptor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** + * Full API cache test. + */ +@SuppressWarnings({"TransientFieldInNonSerializableClass", "unchecked"}) +public class InterceptorCacheConfigVariationsFullApiTest extends IgniteCacheConfigVariationsFullApiTest { + /** */ + private static volatile boolean validate = true; + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cc = super.cacheConfiguration(); + + cc.setInterceptor(new TestInterceptor()); + + return cc; + } + + /** {@inheritDoc} */ + @Override public void testTtlNoTx() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void testTtlNoTxOldEntry() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void testTtlTx() throws Exception { + // No-op. + } + + /** + * + */ + private static class TestInterceptor implements CacheInterceptor { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Nullable @Override public V onGet(K key, V val) { + if (validate && val != null) + assertFalse("Val: " + val, val instanceof BinaryObject); + + return val; + } + + /** {@inheritDoc} */ + @Nullable @Override public V onBeforePut(Cache.Entry e, V newVal) { + if (validate) { + validateEntry(e); + + if (newVal != null) + assertFalse("NewVal: " + newVal, newVal instanceof BinaryObject); + } + + return newVal; + } + + /** {@inheritDoc} */ + @Override public void onAfterPut(Cache.Entry entry) { + validateEntry(entry); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteBiTuple onBeforeRemove(Cache.Entry entry) { + validateEntry(entry); + + return new IgniteBiTuple<>(false, entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void onAfterRemove(Cache.Entry entry) { + validateEntry(entry); + } + + /** + * @param e Value. + */ + private void validateEntry(Cache.Entry e) { + assertNotNull(e); + assertNotNull(e.getKey()); + + if (validate) { + assertFalse("Key: " + e.getKey(), e.getKey() instanceof BinaryObject); + + if (e.getValue() != null) + assertFalse("Val: " + e.getValue(), e.getValue() instanceof BinaryObject); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorWithKeepBinaryCacheFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorWithKeepBinaryCacheFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorWithKeepBinaryCacheFullApiTest.java new file mode 100644 index 0000000..d16e6e7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/InterceptorWithKeepBinaryCacheFullApiTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import javax.cache.Cache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheInterceptor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +@SuppressWarnings("unchecked") +public class InterceptorWithKeepBinaryCacheFullApiTest extends WithKeepBinaryCacheFullApiTest { + /** */ + private static volatile boolean validate; + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(IgniteConfiguration cfg, String cacheName) { + CacheConfiguration cc = super.cacheConfiguration(cfg, cacheName); + + cc.setInterceptor(new TestInterceptor()); + + return cc; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + validate = true; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + validate = false; + + super.afterTest(); + } + + /** + * + */ + private static class TestInterceptor implements CacheInterceptor { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Nullable @Override public V onGet(K key, V val) { + // TODO IGNITE-2973: should always validate key here, but cannot due to the bug. + validate(key, val, false, true); + + return val; + } + + /** {@inheritDoc} */ + @Nullable @Override public V onBeforePut(Cache.Entry e, V newVal) { + if (validate) { + validate(e.getKey(), e.getValue(), true, true); + + if (newVal != null) + assertEquals("NewVal: " + newVal, interceptorBinaryObjExp, newVal instanceof BinaryObject); + } + + return newVal; + } + + /** {@inheritDoc} */ + @Override public void onAfterPut(Cache.Entry entry) { + validate(entry.getKey(), entry.getValue(), true, false); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteBiTuple onBeforeRemove(Cache.Entry entry) { + validate(entry.getKey(), entry.getValue(), true, true); + + return new IgniteBiTuple<>(false, entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void onAfterRemove(Cache.Entry entry) { + validate(entry.getKey(), entry.getValue(), true, true); + } + + /** + * @param key Key. + * @param val Value. + * @param validateKey Validate key flag. + * @param validateVal Validate value flag. + */ + private void validate(K key, V val, boolean validateKey, boolean validateVal) { + assertNotNull(key); + + if (validate) { + if (validateKey) + assertTrue("Key: " + key, key instanceof BinaryObject); + + if (val != null) { + // TODO IGNITE-2973: should always do this check, but cannot due to the bug. + if (validateVal && interceptorBinaryObjExp) + assertTrue("Val: " + val, val instanceof BinaryObject); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WithKeepBinaryCacheFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WithKeepBinaryCacheFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WithKeepBinaryCacheFullApiTest.java new file mode 100644 index 0000000..1954a8d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/WithKeepBinaryCacheFullApiTest.java @@ -0,0 +1,1234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest.DataMode.PLANE_OBJECT; +import static org.apache.ignite.testframework.junits.IgniteConfigVariationsAbstractTest.DataMode.SERIALIZABLE; + +/** + * + */ +@SuppressWarnings("unchecked") +public class WithKeepBinaryCacheFullApiTest extends IgniteCacheConfigVariationsAbstractTest { + /** */ + protected static volatile boolean interceptorBinaryObjExp = true; + + /** */ + public static final int CNT = 10; + + /** */ + public static final CacheEntryProcessor NOOP_ENTRY_PROC = new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException { + return null; + } + }; + + /** */ + public static final CacheEntryProcessor INC_ENTRY_PROC_USER_OBJ = new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException { + assertTrue(entry.getKey() instanceof BinaryObject); + + Object val = entry.getValue(); + + int valId = 0; + + if (val != null) { + assertTrue(val instanceof BinaryObject); + + valId = valueOf(((BinaryObject)val).deserialize()) + 1; + } + + Object newVal = value(valId, (DataMode)arguments[0]); + + assertFalse(newVal instanceof BinaryObject); + + entry.setValue(newVal); + + return val == null ? null : ((BinaryObject)val).deserialize(); + } + }; + + /** */ + public static final CacheEntryProcessor INC_ENTRY_PROC_BINARY_OBJ = new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) throws EntryProcessorException { + assertTrue(entry.getKey() instanceof BinaryObject); + + Object val = entry.getValue(); + + int valId = 0; + + if (val != null) { + assertTrue(val instanceof BinaryObject); + + valId = valueOf(((BinaryObject)val).deserialize()) + 1; + } + + Object newVal = value(valId, (DataMode)arguments[0]); + + Object newBinaryVal = ((Ignite)entry.unwrap(Ignite.class)).binary().toBinary(newVal); + + entry.setValue(newBinaryVal); + + return val; + } + }; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cc = super.cacheConfiguration(); + + cc.setStoreKeepBinary(true); + + return cc; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testRemovePutGet() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary(); + + final Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + for (Object key : keys) + cache.remove(key); + } + }); + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + for (Object key : keys) { + assertNull(cache.get(key)); + assertNull(cache.getEntry(key)); + } + } + }); + + for (final Object key : keys) { + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + Object val = value(valueOf(key)); + + cache.put(key, val); + + BinaryObject retVal = (BinaryObject)cache.get(key); + + assertEquals(val, retVal.deserialize()); + + CacheEntry entry = cache.getEntry(key); + + assertTrue(entry.getKey() instanceof BinaryObject); + + assertEquals(val, entry.getValue().deserialize()); + + assertTrue(cache.remove(key)); + } + }); + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testRemovePutGetAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary().withAsync(); + + final Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + for (Object key : keys) { + cache.remove(key); + + cache.future().get(); + } + } + }); + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + for (Object key : keys) { + cache.get(key); + assertNull(cache.future().get()); + + cache.getEntry(key); + assertNull(cache.future().get()); + } + } + }); + + + for (final Object key : keys) { + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + Object val = value(valueOf(key)); + + cache.put(key, val); + + cache.future().get(); + + cache.get(key); + BinaryObject retVal = (BinaryObject)cache.future().get(); + + assertEquals(val, retVal.deserialize()); + + cache.getEntry(key); + CacheEntry e = (CacheEntry)cache.future().get(); + + assertEquals(key, deserializeBinary(e.getKey())); + + assertEquals(val, e.getValue().deserialize()); + } + }); + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testPutAllGetAll() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary(); + + final Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + for (Object val : cache.getAll(keys).values()) + assertNull(val); + } + }); + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + Collection entries = cache.getEntries(keys); + + for (CacheEntry e : entries) + assertNull(e.getValue()); + } + }); + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + Map keyValMap = new LinkedHashMap() {{ + for (Object key : keys) { + Object val = value(valueOf(key)); + + put(key, val); + } + }}; + + cache.putAll(keyValMap); + + Set> set = cache.getAll(keys).entrySet(); + + for (Map.Entry e : set) { + Object expVal = value(valueOf(e.getKey().deserialize())); + + assertEquals(expVal, e.getValue().deserialize()); + } + + Collection> entries = cache.getEntries(keys); + + for (CacheEntry e : entries) { + assertTrue(e.getKey() instanceof BinaryObject); + + Object expVal = value(valueOf(e.getKey().deserialize())); + + assertEquals(expVal, e.getValue().deserialize()); + } + + cache.removeAll(keys); + } + }); + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testPutAllGetAllAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary().withAsync(); + + final Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + cache.getAll(keys); + Map res = (Map)cache.future().get(); + + for (Object val : res.values()) + assertNull(val); + } + }); + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + cache.getEntries(keys); + + Collection entries = (Collection)cache.future().get(); + + for (CacheEntry e : entries) + assertNull(e.getValue()); + } + }); + + runInAllTxModes(new TestRunnable() { + @Override public void run() throws Exception { + Map keyValMap = new LinkedHashMap() {{ + for (Object key : keys) { + Object val = value(valueOf(key)); + + put(key, val); + } + }}; + + cache.putAll(keyValMap); + + cache.future().get(); + + cache.getAll(keys); + Set> set = ((Map)cache.future().get()).entrySet(); + + for (Map.Entry e : set) { + Object expVal = value(valueOf(e.getKey().deserialize())); + + assertEquals(expVal, e.getValue().deserialize()); + } + + cache.getEntries(keys); + + Collection> entries = + (Collection>)cache.future().get(); + + for (CacheEntry e : entries) { + assertTrue(e.getKey() instanceof BinaryObject); + + Object expVal = value(valueOf(e.getKey().deserialize())); + + assertEquals(expVal, e.getValue().deserialize()); + } + + cache.removeAll(keys); + + cache.future().get(); + } + }); + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testInvoke() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary(); + + Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + for (final Object key : keys) { + Object res = cache.invoke(key, NOOP_ENTRY_PROC); + + assertNull(res); + + assertNull(cache.get(key)); + } + + for (final Object key : keys) { + Object res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + assertNull(res); + + assertEquals(value(0), deserializeBinary(cache.get(key))); + + res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + assertEquals(value(0), deserializeBinary(res)); + + assertEquals(value(1), deserializeBinary(cache.get(key))); + + assertTrue(cache.remove(key)); + } + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + for (final Object key : keys) { + Object res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + assertNull(res); + + assertEquals(value(0), deserializeBinary(cache.get(key))); + + res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + // TODO IGNITE-2953: uncomment the following assert when the issue will be fixed. +// assertEquals(value(0), res); + + assertEquals(value(1), deserializeBinary(cache.get(key))); + + assertTrue(cache.remove(key)); + } + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeTx() throws Exception { + if (!txShouldBeUsed()) + return; + + for (TransactionConcurrency conc : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + // TODO IGNITE-2971: delete this if when the issue will be fixed. + if (conc == TransactionConcurrency.OPTIMISTIC && isolation == TransactionIsolation.SERIALIZABLE) + continue; + + info(">>>>> Executing test using explicite txs [concurrency=" + conc + ", isolation=" + isolation + "]"); + + checkInvokeTx(conc, isolation); + + jcache().removeAll(); + } + } + } + + /** + * @param conc Concurrency + * @param isolation Isolation. + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void checkInvokeTx(final TransactionConcurrency conc, final TransactionIsolation isolation) throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary(); + + Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + for (final Object key : keys) { + Object res = cache.invoke(key, NOOP_ENTRY_PROC); + + assertNull(res); + + assertNull(cache.get(key)); + } + + tx.commit(); + } + + for (final Object key : keys) { + Object res; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + tx.commit(); + } + + assertNull(res); + + assertEquals(value(0), deserializeBinary(cache.get(key))); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + res = cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + tx.commit(); + } + + assertEquals(value(0), deserializeBinary(res)); + + assertEquals(value(1), deserializeBinary(cache.get(key))); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + assertTrue(cache.remove(key)); + + tx.commit(); + } + } + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + for (final Object key : keys) { + Object res; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + tx.commit(); + } + + assertNull(res); + + assertEquals(value(0), deserializeBinary(cache.get(key))); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + res = cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + tx.commit(); + } + + // TODO IGNITE-2953: uncomment the following assert when the issue will be fixed. +// assertEquals(value(0), res); + + assertEquals(value(1), deserializeBinary(cache.get(key))); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + assertTrue(cache.remove(key)); + + tx.commit(); + } + } + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testInvokeAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary().withAsync(); + + Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + for (final Object key : keys) { + cache.invoke(key, NOOP_ENTRY_PROC); + + Object res = cache.future().get(); + + assertNull(res); + + cache.get(key); + + assertNull(cache.future().get()); + } + + for (final Object key : keys) { + cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + Object res = cache.future().get(); + + assertNull(res); + + cache.get(key); + + assertEquals(value(0), deserializeBinary(cache.future().get())); + + cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + res = cache.future().get(); + + assertEquals(value(0), deserializeBinary(res)); + + cache.get(key); + + assertEquals(value(1), deserializeBinary(cache.future().get())); + + cache.remove(key); + + assertTrue((Boolean)cache.future().get()); + } + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + for (final Object key : keys) { + cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + Object res = cache.future().get(); + + assertNull(res); + + cache.get(key); + + assertEquals(value(0), deserializeBinary(cache.future().get())); + + cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + res = cache.future().get(); + + // TODO IGNITE-2953: uncomment the following assert when the issue will be fixed. +// assertEquals(value(0), res); + + cache.get(key); + + assertEquals(value(1), deserializeBinary(cache.future().get())); + + cache.remove(key); + + assertTrue((Boolean)cache.future().get()); + } + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + public void testInvokeAsyncTx() throws Exception { + if (!txShouldBeUsed()) + return; + + for (TransactionConcurrency conc : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + // TODO IGNITE-2971: delete this if when the issue will be fixed. + if (conc == TransactionConcurrency.OPTIMISTIC && isolation == TransactionIsolation.SERIALIZABLE) + continue; + + checkInvokeAsyncTx(conc, isolation); + + jcache().removeAll(); + } + } + } + + /** + * @param conc Concurrency. + * @param isolation Isolation. + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void checkInvokeAsyncTx(final TransactionConcurrency conc, final TransactionIsolation isolation) throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary().withAsync(); + + Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + for (final Object key : keys) { + cache.invoke(key, NOOP_ENTRY_PROC); + + Object res = cache.future().get(); + + assertNull(res); + + cache.get(key); + + assertNull(cache.future().get()); + } + + tx.commit(); + } + + for (final Object key : keys) { + Object res; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + res = cache.future().get(); + + tx.commit(); + } + + assertNull(res); + + cache.get(key); + + assertEquals(value(0), deserializeBinary(cache.future().get())); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invoke(key, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + tx.commit(); + } + + res = cache.future().get(); + + assertEquals(value(0), deserializeBinary(res)); + + cache.get(key); + + assertEquals(value(1), deserializeBinary(cache.future().get())); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.remove(key); + + assertTrue((Boolean)cache.future().get()); + + tx.commit(); + } + } + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + for (final Object key : keys) { + Object res; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + res = cache.future().get(); + + tx.commit(); + } + + assertNull(res); + + cache.get(key); + + assertEquals(value(0), deserializeBinary(cache.future().get())); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invoke(key, INC_ENTRY_PROC_USER_OBJ, dataMode); + + res = cache.future().get(); + + tx.commit(); + } + + // TODO IGNITE-2953: uncomment the following assert when the issue will be fixed. +// assertEquals(value(0), res); + + cache.get(key); + + assertEquals(value(1), deserializeBinary(cache.future().get())); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.remove(key); + + assertTrue((Boolean)cache.future().get()); + + tx.commit(); + } + } + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testInvokeAll() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary(); + + Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + Map> resMap = cache.invokeAll(keys, NOOP_ENTRY_PROC); + + for (Map.Entry> e : resMap.entrySet()) { + assertTrue("Key:" + e.getKey(), e.getKey() instanceof BinaryObject); + + assertNull(e.getValue().get()); + } + + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + checkInvokeAllResult(cache, resMap, null, value(0), true); + + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + checkInvokeAllResult(cache, resMap, value(0), value(1), true); + + cache.removeAll(keys); + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + checkInvokeAllResult(cache, resMap, null, value(0), false); + + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + checkInvokeAllResult(cache, resMap, value(0), value(1), false); + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testInvokeAllTx() throws Exception { + if (!txShouldBeUsed()) + return; + + for (TransactionConcurrency conc : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + checkInvokeAllTx(conc, isolation); + + jcache().removeAll(); + } + } + } + + /** + * @param conc Concurrency. + * @param isol Isolation. + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + private void checkInvokeAllTx(final TransactionConcurrency conc, final TransactionIsolation isol) throws Exception { + if (!txShouldBeUsed()) + return; + + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary(); + + final Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) { + Map> resMap = cache.invokeAll(keys, NOOP_ENTRY_PROC); + + for (Map.Entry> e : resMap.entrySet()) { + assertTrue("Key:" + e.getKey(), e.getKey() instanceof BinaryObject); + + assertNull(e.getValue().get()); + } + + tx.commit(); + } + + Map> resMap; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) { + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + tx.commit(); + } + + checkInvokeAllResult(cache, resMap, null, value(0), true); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) { + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + tx.commit(); + } + + checkInvokeAllResult(cache, resMap, value(0), value(1), true); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) { + cache.removeAll(keys); + + tx.commit(); + } + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) { + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + tx.commit(); + } + + checkInvokeAllResult(cache, resMap, null, value(0), false); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) { + resMap = cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + tx.commit(); + } + + checkInvokeAllResult(cache, resMap, value(0), value(1), false); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isol)) { + cache.removeAll(keys); + + tx.commit(); + } + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @param cache Cache. + * @param resMap Result map. + * @param expRes Expected result. + * @param cacheVal Expected cache value for key. + * @param deserializeRes Deseriallize result flag. + */ + private void checkInvokeAllResult(IgniteCache cache, Map> resMap, + Object expRes, Object cacheVal, boolean deserializeRes) { + for (Map.Entry> e : resMap.entrySet()) { + info("Key: " + e.getKey()); + + assertTrue("Wrong key type, binary object expected: " + e.getKey(), e.getKey() instanceof BinaryObject); + + Object res = e.getValue().get(); + + // TODO IGNITE-2953: delete the following if when the issue wiil be fixed. + if (deserializeRes) + assertEquals(expRes, deserializeRes ? deserializeBinary(res) : res); + + if (cache.get(e.getKey()) == null) + cache.get(e.getKey()); + + assertEquals(cacheVal, deserializeBinary(cache.get(e.getKey()))); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testInvokeAllAsync() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary().withAsync(); + + final Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + cache.invokeAll(keys, NOOP_ENTRY_PROC); + + Map> resMap = + (Map>)cache.future().get(); + + for (Map.Entry> e : resMap.entrySet()) { + assertTrue("Wrong key type, binary object expected: " + e.getKey(), e.getKey() instanceof BinaryObject); + + assertNull(e.getValue().get()); + } + + cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + checkInvokeAllAsyncResult(cache, resMap, null, value(0), true); + + cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), true); + + cache.removeAll(keys); + + cache.future().get(); + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + checkInvokeAllAsyncResult(cache, resMap, null, value(0), false); + + cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), false); + + cache.removeAll(keys); + + cache.future().get(); + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testInvokeAllAsyncTx() throws Exception { + if (!txShouldBeUsed()) + return; + + for (TransactionConcurrency conc : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + checkInvokeAllAsycnTx(conc, isolation); + + jcache().removeAll(); + } + } + } + + /** + * + * @param conc Concurrency. + * @param isolation Isolation. + * @throws Exception + */ + private void checkInvokeAllAsycnTx(final TransactionConcurrency conc, final TransactionIsolation isolation) throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + final IgniteCache cache = jcache().withKeepBinary().withAsync(); + + final Set keys = new LinkedHashSet() {{ + for (int i = 0; i < CNT; i++) + add(key(i)); + }}; + + Map> resMap; + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invokeAll(keys, NOOP_ENTRY_PROC); + + resMap = (Map>)cache.future().get(); + + tx.commit(); + } + + for (Map.Entry> e : resMap.entrySet()) { + assertTrue("Key:" + e.getKey(), e.getKey() instanceof BinaryObject); + + assertNull(e.getValue().get()); + } + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + tx.commit(); + } + + checkInvokeAllAsyncResult(cache, resMap, null, value(0), true); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invokeAll(keys, INC_ENTRY_PROC_BINARY_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + tx.commit(); + } + + checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), true); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.removeAll(keys); + + cache.future().get(); + + tx.commit(); + } + + // TODO IGNITE-2973: should be always false. + interceptorBinaryObjExp = atomicityMode() == TRANSACTIONAL; + + try { + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + tx.commit(); + } + + checkInvokeAllAsyncResult(cache, resMap, null, value(0), false); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.invokeAll(keys, INC_ENTRY_PROC_USER_OBJ, dataMode); + + resMap = (Map>)cache.future().get(); + + tx.commit(); + } + + checkInvokeAllAsyncResult(cache, resMap, value(0), value(1), false); + + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + cache.removeAll(keys); + + cache.future().get(); + + tx.commit(); + } + } + finally { + interceptorBinaryObjExp = true; + } + } + }, PLANE_OBJECT, SERIALIZABLE); + } + + /** + * @param cache Cache. + * @param resMap Result map. + * @param expRes Expected result. + * @param cacheVal Expected cache value for key. + * @param deserializeRes Deseriallize result flag. + */ + private void checkInvokeAllAsyncResult(IgniteCache cache, Map> resMap, + Object expRes, Object cacheVal, boolean deserializeRes) { + for (Map.Entry> e : resMap.entrySet()) { + info("Key: " + e.getKey()); + + assertTrue("Wrong key type, binary object expected: " + e.getKey(), e.getKey() instanceof BinaryObject); + + Object res = e.getValue().get(); + + // TODO IGNITE-2953: delete the following if when the issue wiil be fixed. + if (deserializeRes) + assertEquals(expRes, deserializeRes ? deserializeBinary(res) : res); + + cache.get(e.getKey()); + + assertEquals(cacheVal, deserializeBinary(cache.future().get())); + } + } + + /** + * @param val Value + * @return User object. + */ + private static Object deserializeBinary(Object val) { + assertTrue("Val: " + val, val instanceof BinaryObject); + + return ((BinaryObject)val).deserialize(); + } + + /** + * @param task Task. + */ + protected void runInAllTxModes(TestRunnable task) throws Exception { + info("Executing implicite tx"); + + task.run(); + + if (txShouldBeUsed()) { + for (TransactionConcurrency conc : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + try (Transaction tx = testedGrid().transactions().txStart(conc, isolation)) { + info("Executing explicite tx [isolation" + isolation + ", concurrency=" + conc + "]"); + + task.run(); + + tx.commit(); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java index 81cdb15..485e811 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeConfigVariationsFullApiTest.java @@ -361,16 +361,18 @@ public class IgniteComputeConfigVariationsFullApiTest extends IgniteConfigVariat public void testApplyForCollection() throws Exception { runTest(closureFactories, new ComputeTest() { @Override public void test(Factory factory, Ignite ignite) throws Exception { - Collection params = new ArrayList<>(MAX_JOB_COUNT); + Collection params = new ArrayList<>(MAX_JOB_COUNT); for (int i = 0; i < MAX_JOB_COUNT; ++i) { // value(i - 1): use negative argument of the value method to generate nullong value. - // Use type casting to avoid ambiguous for apply(Callable, Object) vs apply(Callable, Collection). - params.add((TestObject)value(i - 1)); + params.add(value(i - 1)); } - Collection results = ignite.compute() - .apply((IgniteClosure)factory.create(), params); + IgniteClosure c = (IgniteClosure)factory.create(); + + // Use type casting to avoid ambiguous for apply(Callable, Object) vs apply(Callable, Collection). + Collection results = ignite.compute().apply((IgniteClosure)c, + (Collection)params); checkResultsClassCount(MAX_JOB_COUNT - 1, results, value(0).getClass()); assertCollectionsEquals("Results value mismatch", createGoldenResults(), results); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee7e2c7d/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java index b20cb0e..4a60671 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java @@ -246,6 +246,15 @@ public class ConfigVariationsTestSuiteBuilder { } /** + * All tests will be run for first {@code testedNodeCnt} grids. For {@code grid(0)}, {@code grid(1)}, ... , + * {@code grid(testedNodeCnt - 1)}. + *

+ * Usually it needs if you want to execute tests for both client and data nodes (see {@link #withClients()}). + *

    + *
  • If test-class extends {@link IgniteConfigVariationsAbstractTest} then use {@code testedNodesCount(2)}.
  • + *
  • if test-class extends {@link IgniteCacheConfigVariationsAbstractTest} then use {@code testedNodesCount(3)}.
  • + *
+ * * @param testedNodeCnt Tested node count. * @return {@code this} for chaining. */