Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7121518789 for ; Tue, 13 Oct 2015 07:05:35 +0000 (UTC) Received: (qmail 83831 invoked by uid 500); 13 Oct 2015 07:05:35 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 83717 invoked by uid 500); 13 Oct 2015 07:05:35 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 83612 invoked by uid 99); 13 Oct 2015 07:05:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2015 07:05:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85F55E0BAC; Tue, 13 Oct 2015 07:05:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 13 Oct 2015 07:05:40 -0000 Message-Id: In-Reply-To: <5ee864c094804e66be5db9c770ceaa9a@git.apache.org> References: <5ee864c094804e66be5db9c770ceaa9a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/21] ignite git commit: ignite-1607 WIP ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdd6f1c9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdd6f1c9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdd6f1c9 Branch: refs/heads/ignite-1607 Commit: fdd6f1c9e37a0e17665dfc301e9cb0ba4a13b005 Parents: e05a7f8 Author: sboikov Authored: Thu Oct 8 12:09:17 2015 +0300 Committer: sboikov Committed: Thu Oct 8 14:41:45 2015 +0300 ---------------------------------------------------------------------- .../cache/distributed/near/GridNearTxLocal.java | 22 +- .../transactions/IgniteTxLocalAdapter.java | 318 +++++++++---------- .../cache/transactions/IgniteTxLocalEx.java | 4 +- .../IgniteTxMultiThreadedAbstractTest.java | 97 +++--- ...CachePartitionedTxMultiThreadedSelfTest.java | 3 + 5 files changed, 216 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c43cab5..9207bd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -343,7 +343,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture loadMissing( + @Override public IgniteInternalFuture loadMissing( final GridCacheContext cacheCtx, boolean readThrough, boolean async, @@ -361,8 +361,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { accessPolicy(cacheCtx, keys), skipVals, needVer, - c).chain(new C1>, Boolean>() { - @Override public Boolean apply(IgniteInternalFuture> f) { + c).chain(new C1>, Void>() { + @Override public Void apply(IgniteInternalFuture> f) { try { Map map = f.get(); @@ -373,7 +373,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } } - return true; + return null; } catch (Exception e) { setRollbackOnly(); @@ -398,8 +398,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /*can remap*/true, needVer, c - ).chain(new C1>, Boolean>() { - @Override public Boolean apply(IgniteInternalFuture> f) { + ).chain(new C1>, Void>() { + @Override public Void apply(IgniteInternalFuture> f) { try { Map map = f.get(); @@ -410,7 +410,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } } - return true; + return null; } catch (Exception e) { setRollbackOnly(); @@ -1223,12 +1223,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return plc; } - /** - * @param cacheCtx Cache context. - * @param keys Keys. - * @return Expiry policy. - */ - private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection keys) { + /** {@inheritDoc} */ + @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection keys) { if (accessMap != null) { for (Map.Entry e : accessMap.entrySet()) { if (e.getKey().cacheId() == cacheCtx.cacheId() && keys.contains(e.getKey().key())) http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 76df164..99b4c45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -87,10 +87,8 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.plugin.security.SecurityPermission; @@ -418,7 +416,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public IgniteInternalFuture loadMissing( + @Override public IgniteInternalFuture loadMissing( final GridCacheContext cacheCtx, final boolean readThrough, boolean async, @@ -428,41 +426,106 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean needVer, final GridInClosure3 c ) { - // TODO IGNITE-1607. - return new GridFinishedFuture<>(); - -// if (!async) { -// try { -// if (!readThrough || !cacheCtx.readThrough()) { -// for (KeyCacheObject key : keys) -// c.apply(key, null, null); -// -// return new GridFinishedFuture<>(false); -// } -// -// return new GridFinishedFuture<>( -// cacheCtx.store().loadAll(this, keys, c)); -// } -// catch (IgniteCheckedException e) { -// return new GridFinishedFuture<>(e); -// } -// } -// else { -// return cctx.kernalContext().closure().callLocalSafe( -// new GPC() { -// @Override public Boolean call() throws Exception { -// if (!readThrough || !cacheCtx.readThrough()) { -// for (KeyCacheObject key : keys) -// c.apply(key, null, null); -// -// return false; -// } -// -// return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c); -// } -// }, -// true); -// } + assert cacheCtx.isLocal() : cacheCtx.name(); + + if (!readThrough || !cacheCtx.readThrough()) { + for (KeyCacheObject key : keys) + c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER); + + return new GridFinishedFuture<>(); + } + + try { + IgniteCacheExpiryPolicy expiryPlc = accessPolicy(cacheCtx, keys); + + Map misses = null; + + for (KeyCacheObject key : keys) { + while (true) { + GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + + try { + T2 res = entry.innerGetVersioned(this, + /*readSwap*/true, + /*unmarshal*/true, + /*update-metrics*/!skipVals, + /*event*/!skipVals, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + expiryPlc); + + if (res == null) { + if (misses == null) + misses = new LinkedHashMap<>(); + + misses.put(key, entry.version()); + } + else + c.apply(key, res.get1(), res.get2()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry, will retry: " + key); + } + } + } + + if (misses != null) { + final Map misses0 = misses; + + cacheCtx.store().loadAll(this, misses.keySet(), new CI2() { + private GridCacheVersion nextVer; + + @Override public void apply(KeyCacheObject key, Object val) { + GridCacheVersion ver = misses0.remove(key); + + assert ver != null : key; + + c.apply(key, val, ver); + + if (nextVer == null) + nextVer = cacheCtx.versions().next(); + + CacheObject cacheVal = cacheCtx.toCacheObject(val); + + while (true) { + GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + + try { + boolean set = entry.versionedValue(cacheVal, ver, nextVer); + + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry [set=" + set + + ", curVer=" + ver + ", newVer=" + nextVer + ", " + + "entry=" + entry + ']'); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry, (will retry): " + entry); + } + catch (IgniteCheckedException e) { + // Wrap errors (will be unwrapped). + throw new GridClosureException(e); + } + } + + } + }); + + for (KeyCacheObject key : misses0.keySet()) + c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER); + } + + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } } /** @@ -1517,6 +1580,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @param keys Keys. + * @return Expiry policy. + */ + protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection keys) { + return null; + } + + /** * Adds skipped key. * * @param skipped Skipped set (possibly {@code null}). @@ -1566,46 +1638,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final boolean needReadVer = optimistic() && serializable(); return new GridEmbeddedFuture<>( - new C2>() { - @Override public Map apply(Boolean b, Exception e) { + new C2>() { + @Override public Map apply(Void v, Exception e) { if (e != null) { setRollbackOnly(); throw new GridClosureException(e); } - if (!b && !readCommitted()) { - // There is no store - we must mark the entries. - for (KeyCacheObject key : missedMap.keySet()) { - IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); - - if (txEntry != null) - txEntry.markValid(); - } - } - - if (readCommitted()) { - assert loaded != null; - - Collection notFound = new HashSet<>(missedMap.keySet()); - - notFound.removeAll(loaded); - - // In read-committed mode touch entries that have just been read. - for (KeyCacheObject key : notFound) { - if (loaded.contains(key)) - continue; - - IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); - - GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) : - txEntry.cached(); - - if (entry != null) - cacheCtx.evicts().touch(entry, topologyVersion()); - } - } - return map; } }, @@ -1618,9 +1658,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter skipVals, needReadVer, new GridInClosure3() { - /** */ - private GridCacheVersion nextVer; - @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { if (isRollbackOnly()) { if (log.isDebugEnabled()) @@ -1630,15 +1667,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return; } - GridCacheVersion ver = missedMap.get(key); - - if (ver == null) { - if (log.isDebugEnabled()) - log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); - - return; - } - CacheObject cacheVal = cacheCtx.toCacheObject(val); CacheObject visibleVal = cacheVal; @@ -1655,99 +1683,47 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter visibleVal = txEntry.applyEntryProcessors(visibleVal); } - // In pessimistic mode we hold the lock, so filter validation - // should always be valid. - if (pessimistic()) - ver = null; - - // Initialize next version. - if (nextVer == null) - nextVer = cctx.versions().next(topologyVersion()); - - while (true) { - assert txEntry != null || readCommitted() || skipVals; - - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - - try { - // Must initialize to true since even if filter didn't pass, - // we still record the transaction value. - boolean set; - - try { - set = e.versionedValue(cacheVal, ver, nextVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAll method " + - "(will try again): " + e); - - if (pessimistic() && !readCommitted() && !isRollbackOnly()) { - U.error(log, "Inconsistent transaction state (entry got removed while " + - "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); - - setRollbackOnly(); - - return; - } - - if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey)); - - continue; // While loop. - } - - // In pessimistic mode, we should always be able to set. - assert set || !pessimistic(); - - if (readCommitted() || skipVals) { - cacheCtx.evicts().touch(e, topologyVersion()); - - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } - else { - assert txEntry != null; - - txEntry.setAndMarkValid(cacheVal); + assert txEntry != null || readCommitted() || skipVals; - if (needReadVer) { - assert loadVer != null; + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - txEntry.serializableReadVersion(loadVer); - } + if (readCommitted() || skipVals) { + cacheCtx.evicts().touch(e, topologyVersion()); - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); + } + } + else { + assert txEntry != null; - if (readCommitted()) - loaded.add(key); + txEntry.setAndMarkValid(cacheVal); - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry from transaction [set=" + set + - ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + if (needReadVer) { + assert loadVer != null; - break; // While loop. + txEntry.serializableReadVersion(loadVer); } - catch (IgniteCheckedException ex) { - throw new IgniteException("Failed to put value for cache entry: " + e, ex); + + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); } } + + if (readCommitted()) + loaded.add(key); } }) ); @@ -2455,7 +2431,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (missedForLoad != null) { - IgniteInternalFuture fut = loadMissing( + IgniteInternalFuture fut = loadMissing( cacheCtx, /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, /*async*/true, @@ -2508,8 +2484,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter }); return new GridEmbeddedFuture<>( - new C2>() { - @Override public Set apply(Boolean b, Exception e) { + new C2>() { + @Override public Set apply(Void b, Exception e) { if (e != null) throw new GridClosureException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 8f5f37b..55d0dbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -165,9 +165,11 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @param c Closure. * @param deserializePortable Deserialize portable flag. * @param skipVals Skip values flag. + * @param needVer If {@code true} version is required for loaded values. + * @param c Closure to be applied for loaded values. * @return Future with {@code True} value if loading took place. */ - public IgniteInternalFuture loadMissing( + public IgniteInternalFuture loadMissing( GridCacheContext cacheCtx, boolean readThrough, boolean async, http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java index 9e14d30..7a1a0b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java @@ -219,78 +219,89 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract * @throws Exception If failed. */ public void testOptimisticSerializableConsistency() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-582"); - final IgniteCache cache = grid(0).cache(null); - final int THREADS = 2; + final int THREADS = 3; final int ITERATIONS = 100; - final int key = 0; + for (int key0 = 0; key0 < 20; key0++) { + final int key = key0; - cache.put(key, 0L); + cache.put(key, 0L); - List>> futs = new ArrayList<>(THREADS); + List>> futs = new ArrayList<>(THREADS); - for (int i = 0; i < THREADS; i++) { - futs.add(GridTestUtils.runAsync(new Callable>() { - @Override public Collection call() throws Exception { - Collection res = new ArrayList<>(); + for (int i = 0; i < THREADS; i++) { + futs.add(GridTestUtils.runAsync(new Callable>() { + @Override public Collection call() throws Exception { + Collection res = new ArrayList<>(); - for (int i = 0; i < ITERATIONS; i++) { - while (true) { - try (Transaction tx = grid(0).transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { - long val = cache.get(key); + for (int i = 0; i < ITERATIONS; i++) { + while (true) { + try (Transaction tx = grid(0).transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + long val = cache.get(key); - cache.put(key, val + 1); + cache.put(key, val + 1); - tx.commit(); + tx.commit(); - assertTrue(res.add(val + 1)); + assertTrue(res.add(val + 1)); - break; - } - catch(TransactionOptimisticException e) { - log.info("Got error, will retry: " + e); + break; + } + catch(TransactionOptimisticException e) { + log.info("Got error, will retry: " + e); + } } } + + return res; } + })); + } - return res; - } - })); - } + long total = 0; - List> cols = new ArrayList<>(THREADS); + List> cols = new ArrayList<>(THREADS); - for (IgniteInternalFuture> fut : futs) { - Collection col = fut.get(); + for (IgniteInternalFuture> fut : futs) { + Collection col = fut.get(); - assertEquals(ITERATIONS, col.size()); + assertEquals(ITERATIONS, col.size()); - cols.add(col); - } + total += col.size(); - Set duplicates = new HashSet<>(); + cols.add(col); + } - for (Collection col1 : cols) { - for (Long val1 : col1) { - for (Collection col2 : cols) { - if (col1 == col2) - continue; + log.info("Cache value: " + cache.get(key)); - for (Long val2 : col2) { - if (val1.equals(val2)) { - duplicates.add(val2); + Set duplicates = new HashSet<>(); - break; + for (Collection col1 : cols) { + for (Long val1 : col1) { + for (Collection col2 : cols) { + if (col1 == col2) + continue; + + for (Long val2 : col2) { + if (val1.equals(val2)) { + duplicates.add(val2); + + break; + } } } } } - } - assertTrue("Found duplicated values: " + duplicates, duplicates.isEmpty()); + assertTrue("Found duplicated values: " + duplicates, duplicates.isEmpty()); + + assertEquals((long)THREADS * ITERATIONS, total); + + for (int i = 0; i < gridCount(); i++) + assertEquals(total, (Object)cache.get(key)); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd6f1c9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java index 6ed25eb..346bd34 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java @@ -53,6 +53,9 @@ public class GridCachePartitionedTxMultiThreadedSelfTest extends IgniteTxMultiTh CacheConfiguration cc = defaultCacheConfiguration(); + // TODO IGNITE-1607 add test with near cache. + cc.setNearConfiguration(null); + cc.setCacheMode(PARTITIONED); cc.setBackups(1);