Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBAD918C00 for ; Thu, 4 Feb 2016 14:45:11 +0000 (UTC) Received: (qmail 61486 invoked by uid 500); 4 Feb 2016 14:45:11 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 61429 invoked by uid 500); 4 Feb 2016 14:45:11 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 60405 invoked by uid 99); 4 Feb 2016 14:45:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2016 14:45:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87E92E0B31; Thu, 4 Feb 2016 14:45:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 04 Feb 2016 14:45:25 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/50] [abbrv] ignite git commit: 2224 2224 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01135066 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01135066 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01135066 Branch: refs/heads/sql-store-cmp Commit: 01135066d54df254a0b23afbbffca2ed103e3a8c Parents: 62502b2 Author: Anton Vinogradov Authored: Tue Feb 2 15:25:05 2016 +0300 Committer: Anton Vinogradov Committed: Tue Feb 2 15:25:05 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 45 +- .../processors/cache/CacheEntryImplEx.java | 14 +- .../processors/cache/GridCacheAdapter.java | 297 +++++-- .../processors/cache/GridCacheContext.java | 33 +- .../processors/cache/GridCacheMapEntry.java | 2 +- .../processors/cache/GridCacheProxyImpl.java | 51 ++ .../processors/cache/IgniteCacheProxy.java | 51 ++ .../processors/cache/IgniteInternalCache.java | 85 ++ .../dht/CacheDistributedGetFutureAdapter.java | 15 - .../distributed/dht/GridDhtCacheAdapter.java | 7 +- .../cache/distributed/dht/GridDhtGetFuture.java | 6 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../dht/GridPartitionedGetFuture.java | 38 +- .../dht/GridPartitionedSingleGetFuture.java | 17 +- .../dht/atomic/GridDhtAtomicCache.java | 82 +- .../dht/colocated/GridDhtColocatedCache.java | 42 +- .../distributed/near/GridNearAtomicCache.java | 6 +- .../distributed/near/GridNearCacheAdapter.java | 6 +- .../distributed/near/GridNearCacheEntry.java | 3 +- .../distributed/near/GridNearGetFuture.java | 45 +- ...arOptimisticSerializableTxPrepareFuture.java | 2 +- .../near/GridNearOptimisticTxPrepareFuture.java | 4 + .../GridNearPessimisticTxPrepareFuture.java | 2 + .../near/GridNearTransactionalCache.java | 9 +- .../local/atomic/GridLocalAtomicCache.java | 97 ++- .../cache/transactions/IgniteTxEntry.java | 32 +- .../transactions/IgniteTxLocalAdapter.java | 196 +++-- .../cache/transactions/IgniteTxLocalEx.java | 3 +- .../cache/transactions/IgniteTxManager.java | 2 +- .../cache/CacheGetEntryAbstractTest.java | 803 +++++++++++++++++++ ...GetEntryOptimisticReadCommittedSeltTest.java | 36 + ...etEntryOptimisticRepeatableReadSeltTest.java | 36 + ...eGetEntryOptimisticSerializableSeltTest.java | 36 + ...etEntryPessimisticReadCommittedSeltTest.java | 36 + ...tEntryPessimisticRepeatableReadSeltTest.java | 36 + ...GetEntryPessimisticSerializableSeltTest.java | 36 + .../cache/CacheReadThroughRestartSelfTest.java | 43 +- .../CacheSerializableTransactionsTest.java | 142 +++- .../cache/GridCacheAbstractFullApiSelfTest.java | 141 ++++ .../GridCacheInterceptorAbstractSelfTest.java | 172 +++- ...GridCacheDhtEvictionNearReadersSelfTest.java | 2 +- .../multijvm/IgniteCacheProcessProxy.java | 59 +- .../testsuites/IgniteCacheTestSuite4.java | 12 + .../config/benchmark-multicast.properties | 7 + .../IgniteGetEntriesPutAllTxBenchmark.java | 73 ++ .../cache/IgnitePutGetEntryBenchmark.java | 47 ++ .../cache/IgnitePutGetEntryTxBenchmark.java | 73 ++ 47 files changed, 2644 insertions(+), 342 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 886dca6..a791e38 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -31,10 +31,12 @@ import javax.cache.CacheException; import javax.cache.configuration.Configuration; import javax.cache.event.CacheEntryRemovedListener; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.integration.CacheLoader; import javax.cache.integration.CacheWriter; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; @@ -390,18 +392,59 @@ public interface IgniteCache extends javax.cache.Cache, IgniteAsyncS * null value for a key. */ @IgniteAsyncSupported - Map> invokeAll(Map> map, + public Map> invokeAll(Map> map, Object... args); /** {@inheritDoc} */ @IgniteAsyncSupported @Override public V get(K key); + /** + * Gets an entry from the cache. + *

+ * If the cache is configured to use read-through, and get would return null + * because the entry is missing from the cache, the Cache's {@link CacheLoader} + * is called in an attempt to load the entry. + * + * @param key the key whose associated value is to be returned + * @return the element, or null, if it does not exist. + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws NullPointerException if the key is null + * @throws CacheException if there is a problem fetching the value + * @throws ClassCastException if the implementation is configured to perform + * runtime-type-checking, and the key or value types are incompatible with those that have been + * configured for the {@link Cache} + */ + @IgniteAsyncSupported + public CacheEntry getEntry(K key); + /** {@inheritDoc} */ @IgniteAsyncSupported @Override public Map getAll(Set keys); /** + * Gets a collection of entries from the {@link Cache}. + *

+ * If the cache is configured read-through, and a get for a key would + * return null because an entry is missing from the cache, the Cache's + * {@link CacheLoader} is called in an attempt to load the entry. If an + * entry cannot be loaded for a given key, the key will not be present in + * the returned Collection. + * + * @param keys The keys whose associated values are to be returned. + * @return A collection of entries that were found for the given keys. Entries not found + * in the cache are not in the returned collection. + * @throws NullPointerException if keys is null or if keys contains a null + * @throws IllegalStateException if the cache is {@link #isClosed()} + * @throws CacheException if there is a problem fetching the values + * @throws ClassCastException if the implementation is configured to perform + * runtime-type-checking, and the key or value types are incompatible with those that have been + * configured for the {@link Cache} + */ + @IgniteAsyncSupported + public Collection> getEntries(Set keys); + + /** * Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries * and will not lock any keys if pessimistic transaction is started by thread. * http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java index 1c7111a..af926c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java @@ -21,9 +21,13 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED; + /** * */ @@ -54,6 +58,14 @@ public class CacheEntryImplEx extends CacheEntryImpl implements Cach /** {@inheritDoc} */ public GridCacheVersion version() { + if (ver == GET_ENTRY_INVALID_VER_AFTER_GET) { + throw new IgniteException("Impossible to get entry version after " + + "get() inside OPTIMISTIC REPEATABLE_READ transaction. Use only getEntry() or getEntries() inside " + + "OPTIMISTIC REPEATABLE_READ transaction to solve this problem."); + } + else if (ver == GET_ENTRY_INVALID_VER_UPDATED) + throw new IgniteException("Impossible to get version for entry updated in transaction."); + return ver; } @@ -81,7 +93,7 @@ public class CacheEntryImplEx extends CacheEntryImpl implements Cach String res = "CacheEntry [key=" + getKey() + ", val=" + getValue(); - if (ver != null) { + if (ver != null && ver != GET_ENTRY_INVALID_VER_AFTER_GET && ver != GET_ENTRY_INVALID_VER_UPDATED) { res += ", topVer=" + ver.topologyVersion() + ", nodeOrder=" + ver.nodeOrder() + ", order=" + ver.order() + http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9fd65e5..69abc54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -52,6 +52,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheInterceptor; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CachePeekMode; @@ -607,7 +608,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture> fut) throws IgniteCheckedException { Map kvMap = fut.get(); @@ -1296,7 +1299,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache>, V>() { @Override public V applyx(IgniteInternalFuture> e) throws IgniteCheckedException { return e.get().get(key); @@ -1332,7 +1337,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache getEntry(K key) throws IgniteCheckedException { + A.notNull(key, "key"); + + boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + long start = statsEnabled ? System.nanoTime() : 0L; + + T2 t = (T2)get(key, !ctx.keepBinary(), true); + + CacheEntry val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()): null; + + if (ctx.config().getInterceptor() != null) { + V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); + + val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null) : null; + } + + if (statsEnabled) + metrics0().addGetTimeNanos(System.nanoTime() - start); + + return val; + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture getAsync(final K key) { A.notNull(key, "key"); @@ -1391,7 +1422,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache fut = getAsync(key, !ctx.keepBinary()); + IgniteInternalFuture fut = getAsync(key, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) fut = fut.chain(new CX1, V>() { @@ -1407,6 +1438,42 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> getEntryAsync(final K key) { + A.notNull(key, "key"); + + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteInternalFuture> fut = + (IgniteInternalFuture>)getAsync(key, !ctx.keepBinary(), true); + + final boolean intercept = ctx.config().getInterceptor() != null; + + IgniteInternalFuture> fr = fut.chain( + new CX1>, CacheEntry>() { + @Override public CacheEntry applyx(IgniteInternalFuture> f) + throws IgniteCheckedException { + T2 t = f.get(); + + CacheEntry val = t != null ? new CacheEntryImplEx<>(key, t.get1(), t.get2()) : null; + if (intercept) { + V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); + + return new CacheEntryImplEx<>(key, val0, t != null ? t.get2() : null); + } + else + return val; + } + }); + + if (statsEnabled) + fut.listen(new UpdateGetTimeStatClosure>(metrics0(), start)); + + return fr; + } + + /** {@inheritDoc} */ @Override public Map getAll(@Nullable Collection keys) throws IgniteCheckedException { A.notNull(keys, "keys"); @@ -1414,7 +1481,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache map = getAll(keys, !ctx.keepBinary()); + Map map = getAll(keys, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) map = interceptGet(keys, map); @@ -1426,6 +1493,32 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> getEntries(@Nullable Collection keys) + throws IgniteCheckedException { + A.notNull(keys, "keys"); + + boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + long start = statsEnabled ? System.nanoTime() : 0L; + + Map> map = (Map>)getAll(keys, !ctx.keepBinary(), true); + + Collection> res = new HashSet<>(); + + if (ctx.config().getInterceptor() != null) + res = interceptGetEntries(keys, map); + else + for (Map.Entry> e : map.entrySet()) + res.add(new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2())); + + if (statsEnabled) + metrics0().addGetTimeNanos(System.nanoTime() - start); + + return res; + } + + + /** {@inheritDoc} */ @Override public IgniteInternalFuture> getAllAsync(@Nullable final Collection keys) { A.notNull(keys, "keys"); @@ -1433,7 +1526,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> fut = getAllAsync(keys, !ctx.keepBinary()); + IgniteInternalFuture> fut = getAllAsync(keys, !ctx.keepBinary(), false); if (ctx.config().getInterceptor() != null) return fut.chain(new CX1>, Map>() { @@ -1448,6 +1541,45 @@ public abstract class GridCacheAdapter implements IgniteInternalCache>> getEntriesAsync( + @Nullable final Collection keys) { + A.notNull(keys, "keys"); + + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); + + final long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteInternalFuture>> fut = + (IgniteInternalFuture>>) + ((IgniteInternalFuture)getAllAsync(keys, !ctx.keepBinary(), true)); + + final boolean intercept = ctx.config().getInterceptor() != null; + + IgniteInternalFuture>> rf = + fut.chain(new CX1>>, Collection>>() { + @Override public Collection> applyx( + IgniteInternalFuture>> f) throws IgniteCheckedException { + if (intercept) + return interceptGetEntries(keys, f.get()); + else { + Map> res = U.newHashMap(f.get().size()); + + for (Map.Entry> e : f.get().entrySet()) + res.put(e.getKey(), + new CacheEntryImplEx<>(e.getKey(), e.getValue().get1(), e.getValue().get2())); + + return res.values(); + } + } + }); + + if (statsEnabled) + fut.listen(new UpdateGetTimeStatClosure>>(metrics0(), start)); + + return rf; + } + /** * Applies cache interceptor on result of 'get' operation. * @@ -1490,6 +1622,53 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> interceptGetEntries( + @Nullable Collection keys, Map> map) { + Map> res; + + if (F.isEmpty(keys)) { + assert map.isEmpty(); + + return Collections.emptySet(); + } + + res = U.newHashMap(keys.size()); + + CacheInterceptor interceptor = cacheCfg.getInterceptor(); + + assert interceptor != null; + + for (Map.Entry> e : map.entrySet()) { + V val = interceptor.onGet(e.getKey(), e.getValue().get1()); + + if (val != null) + res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().get2())); + } + + if (map.size() != keys.size()) { // Not all requested keys were in cache. + for (K key : keys) { + if (key != null) { + if (!map.containsKey(key)) { + V val = interceptor.onGet(key, null); + + if (val != null) + res.put(key, new CacheEntryImplEx<>(key, val, null)); + } + } + } + } + + return res.values(); + } + + /** * @param key Key. * @param forcePrimary Force primary. * @param skipTx Skip tx. @@ -1498,6 +1677,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache getAsync( @@ -1508,7 +1688,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache>, V>() { @Override public V applyx(IgniteInternalFuture> e) throws IgniteCheckedException { Map map = e.get(); @@ -1544,6 +1726,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache(res.get1(), res.get2())); - } - else { - ctx.addResult(map, - key, - res.get1(), - skipVals, - keepCacheObjects, - deserializeBinary, - true); - } + ctx.addResult(map, + key, + res.get1(), + skipVals, + keepCacheObjects, + deserializeBinary, + true, + needVer ? res.get2() : null); if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); @@ -1783,20 +1964,14 @@ public abstract class GridCacheAdapter implements IgniteInternalCache(cacheVal, set ? verSet : ver)); - } - else { - ctx.addResult(map, - key, - cacheVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false); - } + ctx.addResult(map, + key, + cacheVal, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? set ? verSet : ver : null); } if (tx0 == null || (!tx0.implicit() && @@ -1889,11 +2064,9 @@ public abstract class GridCacheAdapter implements IgniteInternalCache>(keys) { @Override public IgniteInternalFuture> op(IgniteTxLocalAdapter tx) { - return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough); + return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough, needVer); } }, ctx.operationContextPerCall()); } @@ -4494,28 +4667,31 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache getAsync(final K key, boolean deserializeBinary) { + public final IgniteInternalFuture getAsync(final K key, boolean deserializeBinary, final boolean needVer) { try { checkJta(); } @@ -4548,28 +4726,32 @@ public abstract class GridCacheAdapter implements IgniteInternalCache getAll(Collection keys, boolean deserializeBinary) throws IgniteCheckedException { + public Map getAll(Collection keys, boolean deserializeBinary, + boolean needVer) throws IgniteCheckedException { checkJta(); - return getAllAsync(keys, deserializeBinary).get(); + return getAllAsync(keys, deserializeBinary, needVer).get(); } /** * @param keys Keys. * @param deserializeBinary Deserialize binary flag. + * @param needVer Need version. * @return Read future. */ public IgniteInternalFuture> getAllAsync(@Nullable Collection keys, - boolean deserializeBinary) { + boolean deserializeBinary, boolean needVer) { String taskName = ctx.kernalContext().job().currentTaskName(); return getAllAsync(keys, @@ -4579,7 +4761,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements Externalizable { * @param keepCacheObjects Keep cache objects flag. * @param deserializeBinary Deserialize binary flag. * @param cpy Copy flag. + * @param ver GridCacheVersion. */ @SuppressWarnings("unchecked") public void addResult(Map map, @@ -1890,7 +1892,8 @@ public class GridCacheContext implements Externalizable { boolean skipVals, boolean keepCacheObjects, boolean deserializeBinary, - boolean cpy) { + boolean cpy, + final GridCacheVersion ver) { assert key != null; assert val != null || skipVals; @@ -1902,10 +1905,32 @@ public class GridCacheContext implements Externalizable { assert key0 != null : key; assert val0 != null : val; - map.put((K1)key0, (V1)val0); + map.put((K1)key0, ver != null ? (V1)new T2<>(val0, ver) : (V1)val0); } else - map.put((K1)key, (V1)(skipVals ? true : val)); + map.put((K1)key, + (V1)(ver != null ? + (V1)new T2<>(skipVals ? true : val, ver) : + skipVals ? true : val)); + } + + /** + * @param map Map. + * @param key Key. + * @param val Value. + * @param skipVals Skip values flag. + * @param keepCacheObjects Keep cache objects flag. + * @param deserializeBinary Deserialize binary flag. + * @param cpy Copy flag. + */ + public void addResult(Map map, + KeyCacheObject key, + CacheObject val, + boolean skipVals, + boolean keepCacheObjects, + boolean deserializeBinary, + boolean cpy) { + addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, cpy, null); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2d25d16..64cfd01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -882,7 +882,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateTtl(expiryPlc); if (retVer) { - resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : this.ver; + resVer = (isNear() && cctx.transactional()) ? ((GridNearCacheEntry)this).dhtVersion() : this.ver; if (resVer == null) ret = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 3a53942..9b4aff3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -31,6 +31,7 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; @@ -307,6 +308,18 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ + @Nullable @Override public CacheEntry getEntry(K key) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntry(key); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public V getTopologySafe(K key) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); @@ -331,6 +344,18 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ + @Override public IgniteInternalFuture> getEntryAsync(K key) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntryAsync(key); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public V getForcePrimary(K key) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); @@ -451,6 +476,19 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ + @Override public Collection> getEntries( + @Nullable Collection keys) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntries(keys); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture> getAllAsync(@Nullable Collection keys) { CacheOperationContext prev = gate.enter(opCtx); @@ -463,6 +501,19 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ + @Override public IgniteInternalFuture>> getEntriesAsync( + @Nullable Collection keys) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getEntriesAsync(keys); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Nullable @Override public V getAndPut(K key, V val) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9e66d4d..5ed8753 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -44,6 +44,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheManager; import org.apache.ignite.cache.CacheMetrics; @@ -873,6 +874,31 @@ public class IgniteCacheProxy extends AsyncSupportAdapter getEntry(K key) { + try { + GridCacheGateway gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + if (isAsync()) { + setFuture(delegate.getEntryAsync(key)); + + return null; + } + else + return delegate.getEntry(key); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public Map getAll(Set keys) { try { GridCacheGateway gate = this.gate; @@ -898,6 +924,31 @@ public class IgniteCacheProxy extends AsyncSupportAdapter> getEntries(Set keys) { + try { + GridCacheGateway gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + if (isAsync()) { + setFuture(delegate.getEntriesAsync(keys)); + + return null; + } + else + return delegate.getEntries(keys); + } + finally { + onLeave(gate, prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + + /** {@inheritDoc} */ @Override public Map getAllOutTx(Set keys) { try { GridCacheGateway gate = this.gate; http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 433290c..68d0f06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -31,6 +31,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; @@ -335,6 +336,28 @@ public interface IgniteInternalCache extends Iterable> { @Nullable public V get(K key) throws IgniteCheckedException; /** + * Retrieves value mapped to the specified key from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. The return value of {@code null} + * means entry did not pass the provided filter or cache has no mapping for the + * key. + *

+ * If the value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disable, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * CacheStore#load(Transaction, Object) method. + *

Transactions

+ * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param key Key to retrieve the value for. + * @return Value for the given key. + * @throws IgniteCheckedException If get operation failed. + * @throws NullPointerException if the key is {@code null}. + */ + @Nullable public CacheEntry getEntry(K key) throws IgniteCheckedException; + + /** * Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if * its entry passed the optional filter provided. Filter check is atomic, and therefore the * returned value is guaranteed to be consistent with the filter. The return value of {@code null} @@ -356,6 +379,27 @@ public interface IgniteInternalCache extends Iterable> { public IgniteInternalFuture getAsync(K key); /** + * Asynchronously retrieves value mapped to the specified key from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. The return value of {@code null} + * means entry did not pass the provided filter or cache has no mapping for the + * key. + *

+ * If the value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disabled, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * CacheStore#load(Transaction, Object) method. + *

Transactions

+ * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param key Key for the value to get. + * @return Future for the get operation. + * @throws NullPointerException if the key is {@code null}. + */ + public IgniteInternalFuture> getEntryAsync(K key); + + /** * Retrieves values mapped to the specified keys from cache. Value will only be returned if * its entry passed the optional filter provided. Filter check is atomic, and therefore the * returned value is guaranteed to be consistent with the filter. If requested key-value pair @@ -377,6 +421,27 @@ public interface IgniteInternalCache extends Iterable> { public Map getAll(@Nullable Collection keys) throws IgniteCheckedException; /** + * Retrieves values mapped to the specified keys from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. If requested key-value pair + * is not present in the returned map, then it means that its entry did not pass the provided + * filter or cache has no mapping for the key. + *

+ * If some value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disabled, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure) method. + *

Transactions

+ * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param keys Keys to get. + * @return Map of key-value pairs. + * @throws IgniteCheckedException If get operation failed. + */ + public Collection> getEntries(@Nullable Collection keys) throws IgniteCheckedException; + + /** * Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if * its entry passed the optional filter provided. Filter check is atomic, and therefore the * returned value is guaranteed to be consistent with the filter. If requested key-value pair @@ -397,6 +462,26 @@ public interface IgniteInternalCache extends Iterable> { public IgniteInternalFuture> getAllAsync(@Nullable Collection keys); /** + * Asynchronously retrieves values mapped to the specified keys from cache. Value will only be returned if + * its entry passed the optional filter provided. Filter check is atomic, and therefore the + * returned value is guaranteed to be consistent with the filter. If requested key-value pair + * is not present in the returned map, then it means that its entry did not pass the provided + * filter or cache has no mapping for the key. + *

+ * If some value is not present in cache, then it will be looked up from swap storage. If + * it's not present in swap, or if swap is disabled, and if read-through is allowed, value + * will be loaded from {@link CacheStore} persistent storage via + * CacheStore#loadAll(Transaction, Collection, org.apache.ignite.lang.IgniteBiInClosure) method. + *

Transactions

+ * This method is transactional and will enlist the entry into ongoing transaction + * if there is one. + * + * @param keys Key for the value to get. + * @return Future for the get operation. + */ + public IgniteInternalFuture>> getEntriesAsync(@Nullable Collection keys); + + /** * Stores given key-value pair in cache. If filters are provided, then entries will * be stored in cache only if they pass the filter. Note that filter check is atomic, * so value stored in cache is guaranteed to be consistent with the filters. If cache http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java index 7efaf49..28c94dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -153,21 +153,6 @@ public abstract class CacheDistributedGetFutureAdapter extends GridCompoun } /** - * @param map Result map. - * @param key Key. - * @param val Value. - * @param ver Version. - */ - @SuppressWarnings("unchecked") - protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) { - assert needVer; - assert skipVals || val != null; - assert ver != null; - - map.put(key, new T2<>(skipVals ? true : val, ver)); - } - - /** * Affinity node to send get request to. * * @param affNodes All affinity nodes. http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 9cf8084..5be4e72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -617,6 +617,7 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap * @param keys {@inheritDoc} * @param forcePrimary {@inheritDoc} * @param skipTx {@inheritDoc} + * @param needVer Need version. * @return {@inheritDoc} */ @Override public IgniteInternalFuture> getAllAsync( @@ -627,7 +628,8 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap String taskName, boolean deserializeBinary, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean needVer ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -640,7 +642,8 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap forcePrimary, null, skipVals, - canRemap); + canRemap, + needVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index cb8c842..c926c13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -383,7 +383,8 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture(); @@ -1013,7 +1013,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture extends CacheDistributedGetFutureAda cache.removeIfObsolete(key); } else { - if (needVer) - versionedResult(locVals, key, v, ver); - else { - cctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObjects, - deserializeBinary, - true); - } + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializeBinary, + true, + ver); return true; } @@ -552,17 +549,14 @@ public class GridPartitionedGetFuture extends CacheDistributedGetFutureAda for (GridCacheEntryInfo info : infos) { assert skipVals == (info.value() == null); - if (needVer) - versionedResult(map, info.key(), info.value(), info.version()); - else { - cctx.addResult(map, - info.key(), - info.value(), - skipVals, - keepCacheObjects, - deserializeBinary, - false); - } + cctx.addResult(map, + info.key(), + info.value(), + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? info.version() : null); } return map; http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 0c811ae..01e61bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -625,20 +625,13 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter im assert !skipVals; if (val != null) { - if (needVer) { - assert ver != null; + if (!keepCacheObjects) { + Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary); - onDone(new T2<>(val, ver)); - } - else { - if (!keepCacheObjects) { - Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary); - - onDone(res); - } - else - onDone(val); + onDone(needVer ? new T2<>(res, ver) : res); } + else + onDone(needVer ? new T2<>(val, ver) : val); } else onDone(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cba4e61..b806906 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -317,7 +318,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } /** {@inheritDoc} */ - @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException { + @Override protected V get(K key, String taskName, boolean deserializeBinary, boolean needVer) + throws IgniteCheckedException { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (keyCheck) @@ -339,7 +341,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { expiryPlc, false, skipStore, - true).get(); + true, + needVer).get(); } /** {@inheritDoc} */ @@ -350,7 +353,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { final String taskName, final boolean deserializeBinary, final boolean skipVals, - final boolean canRemap) { + final boolean canRemap, + final boolean needVer) { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (keyCheck) @@ -376,7 +380,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { expiryPlc, skipVals, skipStore, - canRemap); + canRemap, + needVer); } }); } @@ -390,7 +395,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { final String taskName, final boolean deserializeBinary, final boolean skipVals, - final boolean canRemap + final boolean canRemap, + final boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -420,7 +426,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { expiryPlc, skipVals, skipStore, - canRemap); + canRemap, + needVer); } }); } @@ -1098,6 +1105,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param skipVals Skip values flag. * @param skipStore Skip store flag. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Get future. */ private IgniteInternalFuture getAsync0(KeyCacheObject key, @@ -1108,7 +1116,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable ExpiryPolicy expiryPlc, boolean skipVals, boolean skipStore, - boolean canRemap + boolean canRemap, + boolean needVer ) { AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion(); @@ -1126,7 +1135,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { expiry, skipVals, canRemap, - false, + needVer, false); fut.init(); @@ -1145,6 +1154,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param skipStore Skip store flag. + * @param needVer Need version. * @return Get future. */ private IgniteInternalFuture> getAllAsync0(@Nullable Collection keys, @@ -1155,7 +1165,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable ExpiryPolicy expiryPlc, boolean skipVals, boolean skipStore, - boolean canRemap + boolean canRemap, + boolean needVer ) { AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion(); @@ -1180,19 +1191,42 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (entry != null) { boolean isNew = entry.isNewLocked(); - CacheObject v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiry, - !deserializeBinary); + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2 res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiry, + true); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + } // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -1204,7 +1238,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { success = false; } else - ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true); + ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, ver); } else success = false; @@ -1256,7 +1290,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { expiry, skipVals, canRemap, - false, + needVer, false); fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 073043d..dc4b6bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -200,7 +200,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte String taskName, final boolean deserializeBinary, final boolean skipVals, - boolean canRemap) { + boolean canRemap, + final boolean needVer) { ctx.checkSecurity(SecurityPermission.CACHE_READ); if (keyCheck) @@ -218,7 +219,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte deserializeBinary, skipVals, false, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + needVer); return fut.chain(new CX1>, V>() { @SuppressWarnings("unchecked") @@ -258,7 +260,7 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), skipVals, canRemap, - /*needVer*/false, + needVer, /*keepCacheObjects*/false); fut.init(); @@ -275,7 +277,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte String taskName, final boolean deserializeBinary, final boolean skipVals, - boolean canRemap + boolean canRemap, + final boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -297,7 +300,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte deserializeBinary, skipVals, false, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + needVer); } }, opCtx); } @@ -318,7 +322,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte deserializeBinary, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), skipVals, - canRemap); + canRemap, + needVer); } /** {@inheritDoc} */ @@ -345,6 +350,7 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Loaded values. */ public IgniteInternalFuture> loadAsync( @@ -357,7 +363,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte boolean deserializeBinary, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean canRemap) { + boolean canRemap, + boolean needVer) { return loadAsync(keys, readThrough, forcePrimary, @@ -367,7 +374,7 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte expiryPlc, skipVals, canRemap, - false, + needVer, false); } @@ -522,17 +529,14 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte if (locVals == null) locVals = U.newHashMap(keys.size()); - if (needVer) - locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver)); - else { - ctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObj, - deserializeBinary, - true); - } + ctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObj, + deserializeBinary, + true, + ver); } } else http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a2d5adb..63c073d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -400,7 +400,8 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { String taskName, boolean deserializeBinary, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean needVer ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -423,7 +424,8 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, opCtx != null && opCtx.skipStore(), - canRemap); + canRemap, + needVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 5bf18d9..c750be6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -230,6 +230,7 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda * @param skipVal Skip value flag. * @param skipStore Skip store flag. * @param canRemap Can remap flag. + * @param needVer Need version. * @return Loaded values. */ public IgniteInternalFuture> loadAsync(@Nullable IgniteInternalTx tx, @@ -241,7 +242,8 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda @Nullable ExpiryPolicy expiryPlc, boolean skipVal, boolean skipStore, - boolean canRemap + boolean canRemap, + boolean needVer ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.emptyMap()); @@ -261,7 +263,7 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda expiry, skipVal, canRemap, - false, + needVer, false); // init() will register future for responses if future has remote mappings. http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index c0a1617..026fb4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -350,7 +350,8 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { null, false, /*skip store*/false, - /*can remap*/true + /*can remap*/true, + false ).get().get(keyValue(false)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 9291001..06fc0a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -650,26 +650,25 @@ public final class GridNearGetFuture extends CacheDistributedGetFutureAdap */ @SuppressWarnings("unchecked") private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) { - if (needVer) { - V val0 = (V)new T2<>(skipVals ? true : v, ver); + if (keepCacheObjects) { + K key0 = (K)key; + V val0 = needVer ? + (V)new T2<>(skipVals ? true : v, ver) : + (V)(skipVals ? true : v); - add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0))); + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { - if (keepCacheObjects) { - K key0 = (K)key; - V val0 = (V)(skipVals ? true : v); - - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); - } - else { - K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false); - V val0 = !skipVals ? + K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false); + V val0 = needVer ? + (V)new T2<>(!skipVals ? + (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) : + (V)Boolean.TRUE, ver) : + !skipVals ? (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) : (V)Boolean.TRUE; - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); - } + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } } @@ -741,16 +740,14 @@ public final class GridNearGetFuture extends CacheDistributedGetFutureAdap assert skipVals == (info.value() == null); - if (needVer) - versionedResult(map, key, val, info.version()); - else - cctx.addResult(map, - key, - val, - skipVals, - keepCacheObjects, - deserializeBinary, - false); + cctx.addResult(map, + key, + val, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? info.version() : null); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 4f9f227..52ebfc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -107,7 +107,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (txEntry != null) { if (entry.context().isLocal()) { - GridCacheVersion serReadVer = txEntry.serializableReadVersion(); + GridCacheVersion serReadVer = txEntry.entryReadVersion(); if (serReadVer != null) { GridCacheContext ctx = entry.context(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01135066/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index bae0327..b968e57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -279,6 +279,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param topLocked {@code True} if thread already acquired lock preventing topology change. */ private void prepareSingle(IgniteTxEntry write, boolean topLocked) { + write.clearEntryReadVersion(); + AffinityTopologyVersion topVer = tx.topologyVersion(); assert topVer.topologyVersion() > 0; @@ -339,6 +341,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa Queue mappings = new ArrayDeque<>(); for (IgniteTxEntry write : writes) { + write.clearEntryReadVersion(); + GridDistributedTxMapping updated = map(write, topVer, cur, topLocked); if (cur != updated) {