Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95D7118F33 for ; Wed, 28 Oct 2015 12:06:50 +0000 (UTC) Received: (qmail 21577 invoked by uid 500); 28 Oct 2015 12:06:44 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 21523 invoked by uid 500); 28 Oct 2015 12:06:44 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 21012 invoked by uid 99); 28 Oct 2015 12:06:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 12:06:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D46EE0998; Wed, 28 Oct 2015 12:06:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: raulk@apache.org To: commits@ignite.apache.org Date: Wed, 28 Oct 2015 12:07:05 -0000 Message-Id: <940c99c614ce4749b84d5bebb615703b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [24/31] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4074eee..c1e9202 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -25,12 +25,10 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; @@ -64,7 +62,6 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -78,7 +75,6 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; @@ -131,16 +127,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** TX handler. */ private IgniteTxHandler txHandler; - /** All transactions. */ - private final Queue committedQ = new ConcurrentLinkedDeque8<>(); - - /** Preparing transactions. */ - private final Queue prepareQ = new ConcurrentLinkedDeque8<>(); - - /** Minimum start version. */ - private final ConcurrentNavigableMap startVerCnts = - new ConcurrentSkipListMap<>(); - /** Committed local transactions. */ private final GridBoundedConcurrentOrderedMap completedVers = new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); @@ -308,41 +294,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * USE ONLY FOR MEMORY PROFILING DURING TESTS. */ @Override public void printMemoryStats() { - IgniteInternalTx firstTx = committedQ.peek(); - - int committedSize = committedQ.size(); - - Map.Entry startVerEntry = startVerCnts.firstEntry(); - - GridCacheVersion minStartVer = null; - long dur = 0; - - if (committedSize > 3000) { - minStartVer = new GridCacheVersion(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, 0); - - IgniteInternalTx stuck = null; - - for (IgniteInternalTx tx : txs()) - if (tx.startVersion().isLess(minStartVer)) { - minStartVer = tx.startVersion(); - dur = U.currentTimeMillis() - tx.startTime(); - - stuck = tx; - } - - X.println("Stuck transaction: " + stuck); - } - X.println(">>> "); X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']'); X.println(">>> threadMapSize: " + threadMap.size()); - X.println(">>> idMap [size=" + idMap.size() + ", minStartVer=" + minStartVer + ", dur=" + dur + "ms]"); - X.println(">>> committedQueue [size=" + committedSize + - ", firstStartVersion=" + (firstTx == null ? "null" : firstTx.startVersion()) + - ", firstEndVersion=" + (firstTx == null ? "null" : firstTx.endVersion()) + ']'); - X.println(">>> prepareQueueSize: " + prepareQ.size()); - X.println(">>> startVerCntsSize [size=" + startVerCnts.size() + - ", firstVer=" + startVerEntry + ']'); + X.println(">>> idMap [size=" + idMap.size() + ']'); X.println(">>> completedVersSize: " + completedVers.size()); } @@ -361,27 +316,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @return Committed queue size. - */ - public int commitQueueSize() { - return committedQ.size(); - } - - /** - * @return Prepare queue size. - */ - public int prepareQueueSize() { - return prepareQ.size(); - } - - /** - * @return Start version counts. - */ - public int startVersionCountsSize() { - return startVerCnts.size(); - } - - /** * @return Committed versions size. */ public int completedVersionsSize() { @@ -493,42 +427,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return null; } - if (cctx.txConfig().isTxSerializableEnabled()) { - AtomicInt next = new AtomicInt(1); - - boolean loop = true; - - while (loop) { - AtomicInt prev = startVerCnts.putIfAbsent(tx.startVersion(), next); - - if (prev == null) - break; // Put succeeded - exit. - - // Previous value was 0, which means that it will be deleted - // by another thread in "decrementStartVersionCount(..)" method. - // In that case, we delete here too, so we can safely try again. - for (;;) { - int p = prev.get(); - - assert p >= 0 : p; - - if (p == 0) { - if (startVerCnts.remove(tx.startVersion(), prev)) - if (log.isDebugEnabled()) - log.debug("Removed count from onCreated callback: " + tx); - - break; // Retry outer loop. - } - - if (prev.compareAndSet(p, p + 1)) { - loop = false; // Increment succeeded - exit outer loop. - - break; - } - } - } - } - if (tx.timeout() > 0) { cctx.time().addTimeoutObject(tx); @@ -822,117 +720,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); } - boolean txSerEnabled = cctx.txConfig().isTxSerializableEnabled(); - - // Clean up committed transactions queue. - if (tx.pessimistic() && tx.local()) { - if (tx.enforceSerializable() && txSerEnabled) { - for (Iterator it = committedQ.iterator(); it.hasNext();) { - IgniteInternalTx committedTx = it.next(); - - assert committedTx != tx; - - // Clean up. - if (isSafeToForget(committedTx)) - it.remove(); - } - } - - // Nothing else to do in pessimistic mode. - return; - } - - if (txSerEnabled && tx.optimistic() && tx.enforceSerializable()) { - Set readSet = tx.readSet(); - Set writeSet = tx.writeSet(); - - GridCacheVersion startTn = tx.startVersion(); - - GridCacheVersion finishTn = cctx.versions().last(); - - // Add future to prepare queue only on first prepare call. - if (tx.markPreparing()) - prepareQ.offer(tx); - - // Check that our read set does not intersect with write set - // of all transactions that completed their write phase - // while our transaction was in read phase. - for (Iterator it = committedQ.iterator(); it.hasNext();) { - IgniteInternalTx committedTx = it.next(); - - assert committedTx != tx; - - // Clean up. - if (isSafeToForget(committedTx)) { - it.remove(); - - continue; - } - - GridCacheVersion tn = committedTx.endVersion(); - - // We only care about transactions - // with tn > startTn and tn <= finishTn - if (tn.compareTo(startTn) <= 0 || tn.compareTo(finishTn) > 0) - continue; - - if (tx.serializable()) { - if (GridFunc.intersects(committedTx.writeSet(), readSet)) { - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction " + - "(committed vs. read-set conflict): " + tx); - } - } - } - - // Check that our read and write sets do not intersect with write - // sets of all active transactions. - for (Iterator iter = prepareQ.iterator(); iter.hasNext();) { - IgniteInternalTx prepareTx = iter.next(); - - if (prepareTx == tx) - // Skip yourself. - continue; - - // Optimistically remove completed transactions. - if (prepareTx.done()) { - iter.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed finished transaction from active queue: " + prepareTx); - - continue; - } - - // Check if originating node left. - if (cctx.discovery().node(prepareTx.nodeId()) == null) { - iter.remove(); - - rollbackTx(prepareTx); - - if (log.isDebugEnabled()) - log.debug("Removed and rolled back transaction because sender node left grid: " + - CU.txString(prepareTx)); - - continue; - } - - if (tx.serializable() && !prepareTx.isRollbackOnly()) { - Set prepareWriteSet = prepareTx.writeSet(); - - if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) { - // Remove from active set. - iter.remove(); - - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticCheckedException( - "Failed to prepare transaction (read-set/write-set conflict): " + tx); - } - } - } - } + if (tx.pessimistic() && tx.local()) + return; // Nothing else to do in pessimistic mode. // Optimistic. assert tx.optimistic() || !tx.local(); @@ -945,40 +734,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @param tx Transaction to check. - * @return {@code True} if transaction can be discarded. - */ - private boolean isSafeToForget(IgniteInternalTx tx) { - Map.Entry e = startVerCnts.firstEntry(); - - if (e == null) - return true; - - assert e.getValue().get() >= 0; - - return tx.endVersion().compareTo(e.getKey()) <= 0; - } - - /** - * Decrement start version count. - * - * @param tx Cache transaction. - */ - private void decrementStartVersionCount(IgniteInternalTx tx) { - AtomicInt cnt = startVerCnts.get(tx.startVersion()); - - assert cnt != null : "Failed to find start version count for transaction [startVerCnts=" + startVerCnts + - ", tx=" + tx + ']'; - - assert cnt.get() > 0; - - if (cnt.decrementAndGet() == 0) - if (startVerCnts.remove(tx.startVersion(), cnt)) - if (log.isDebugEnabled()) - log.debug("Removed start version for transaction: " + tx); - } - - /** * @param tx Transaction. */ private void removeObsolete(IgniteInternalTx tx) { @@ -1237,6 +992,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @param tx Transaction. + * @return {@code True} if transaction read entries should be unlocked. + */ + private boolean unlockReadEntries(IgniteInternalTx tx) { + if (tx.pessimistic()) + return !tx.readCommitted(); + else + return tx.serializable(); + } + + /** * Commits a transaction. * * @param tx Transaction to commit. @@ -1290,8 +1056,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 4. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 5. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) + // 5. Unlock read resources if required. + if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); // 6. Notify evictions. @@ -1303,25 +1069,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 8. Assign transaction number at the end of transaction. tx.endVersion(cctx.versions().next(tx.topologyVersion())); - // 9. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 10. Add to committed queue only if it is possible - // that this transaction can affect other ones. - if (cctx.txConfig().isTxSerializableEnabled() && tx.enforceSerializable() && !isSafeToForget(tx)) - committedQ.add(tx); - - // 11. Remove from per-thread storage. + // 9. Remove from per-thread storage. clearThreadMap(tx); - // 12. Unregister explicit locks. + // 10. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) { for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); } - // 13. Remove Near-2-DHT mappings. + // 11. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) { GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion(); @@ -1329,10 +1086,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { mappedVers.remove(mapped); } - // 14. Clear context. + // 12. Clear context. resetContext(); - // 15. Update metrics. + // 14. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) cctx.txMetrics().onTxCommit(); @@ -1378,8 +1135,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 2. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 3. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) + // 3. Unlock read resources if required. + if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); // 4. Notify evictions. @@ -1388,26 +1145,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 5. Remove obsolete entries. removeObsolete(tx); - // 6. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 7. Remove from per-thread storage. + // 6. Remove from per-thread storage. clearThreadMap(tx); - // 8. Unregister explicit locks. + // 7. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); - // 9. Remove Near-2-DHT mappings. + // 8. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - // 10. Clear context. + // 9. Clear context. resetContext(); - // 11. Update metrics. + // 10. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) cctx.txMetrics().onTxRollback(); @@ -1445,30 +1198,26 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 1. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 2. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) + // 2. Unlock read resources if required. + if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); // 3. Notify evictions. notifyEvitions(tx); - // 4. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 5. Remove from per-thread storage. + // 4. Remove from per-thread storage. clearThreadMap(tx); - // 6. Unregister explicit locks. + // 5. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); - // 7. Remove Near-2-DHT mappings. + // 6. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - // 8. Clear context. + // 7. Clear context. resetContext(); if (log.isDebugEnabled()) @@ -1635,6 +1384,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // we wait for the lock. long timeout = tx.timeout() == 0 ? 0 : remainingTime; + GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion() : null; + for (IgniteTxEntry txEntry1 : entries) { // Check if this entry was prepared before. if (!txEntry1.markPrepared() || txEntry1.explicitVersion() != null) @@ -1649,7 +1400,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert !entry1.detached() : "Expected non-detached entry for near transaction " + "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']'; - if (!entry1.tmLock(tx, timeout)) { + GridCacheVersion serReadVer = txEntry1.serializableReadVersion(); + + assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1; + + if (!entry1.tmLock(tx, timeout, serOrder, serReadVer)) { // Unlock locks locked so far. for (IgniteTxEntry txEntry2 : entries) { if (txEntry2 == txEntry1) http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 36f1c36..68d03cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -53,9 +53,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { /** Last version. */ private volatile GridCacheVersion last; - /** Serializable transaction flag. */ - private boolean txSerEnabled; - /** Data center ID. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private byte dataCenterId; @@ -64,6 +61,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { private long gridStartTime; /** */ + private GridCacheVersion ISOLATED_STREAMER_VER; + + /** */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { assert evt.type() == EVT_NODE_METRICS_UPDATED; @@ -79,8 +79,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - txSerEnabled = cctx.gridConfig().getTransactionConfiguration().isTxSerializableEnabled(); - last = new GridCacheVersion(0, 0, order.get(), 0, dataCenterId); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED); @@ -154,6 +152,27 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { } /** + * Version for entries loaded with isolated streamer, should be less than any version generated + * for entries update. + * + * @return Version for entries loaded with isolated streamer. + */ + public GridCacheVersion isolatedStreamerVersion() { + if (ISOLATED_STREAMER_VER == null) { + long topVer = 1; + + if (gridStartTime == 0) + gridStartTime = cctx.kernalContext().discovery().gridStartTime(); + + topVer += (gridStartTime - TOP_VER_BASE_TIME) / 1000; + + ISOLATED_STREAMER_VER = new GridCacheVersion((int)topVer, 0, 0, 1, dataCenterId); + } + + return ISOLATED_STREAMER_VER; + } + + /** * @return Next version based on current topology. */ public GridCacheVersion next() { @@ -235,36 +254,18 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { int locNodeOrder = (int)cctx.localNode().order(); - if (txSerEnabled) { - synchronized (this) { - long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet(); - - GridCacheVersion next = new GridCacheVersion( - (int)topVer, - globalTime, - ord, - locNodeOrder, - dataCenterId); - - last = next; + long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet(); - return next; - } - } - else { - long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet(); - - GridCacheVersion next = new GridCacheVersion( - (int)topVer, - globalTime, - ord, - locNodeOrder, - dataCenterId); + GridCacheVersion next = new GridCacheVersion( + (int)topVer, + globalTime, + ord, + locNodeOrder, + dataCenterId); - last = next; + last = next; - return next; - } + return next; } /** @@ -273,12 +274,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Last generated version. */ public GridCacheVersion last() { - if (txSerEnabled) { - synchronized (this) { - return last; - } - } - else - return last; + return last; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index ab2a6e8..2190bf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1556,7 +1556,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - GridCacheVersion ver = cctx.versions().next(topVer); + GridCacheVersion ver = cctx.versions().isolatedStreamerVersion(); long ttl = CU.TTL_ETERNAL; long expiryTime = CU.EXPIRE_TIME_ETERNAL; http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java index a6e960d..6c4e894 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java @@ -54,7 +54,7 @@ import org.apache.ignite.lang.IgniteUuid; * Read access with this level happens the same way as with {@link TransactionIsolation#REPEATABLE_READ} level. * However, in {@link TransactionConcurrency#OPTIMISTIC} mode, if some transactions cannot be serially isolated * from each other, then one winner will be picked and the other transactions in conflict will result in - * {@link org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException} being thrown. + * {@link TransactionOptimisticException} being thrown. * * *

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java index d7671f0..c3be3c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java @@ -42,8 +42,7 @@ public enum TransactionIsolation { * @param ord Ordinal value. * @return Enumerated value or {@code null} if ordinal out of range. */ - @Nullable - public static TransactionIsolation fromOrdinal(int ord) { + @Nullable public static TransactionIsolation fromOrdinal(int ord) { return ord >= 0 && ord < VALS.length ? VALS[ord] : null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java new file mode 100644 index 0000000..c2f9fab --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNearReaderUpdateTest.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionOptimisticException; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheNearReaderUpdateTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static final int SRVS = 4; + + /** */ + private static final int CLIENTS = 3; + + /** */ + private static Map storeMap = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testGetUpdateMultithreaded() throws Exception { + List> cfgs = new ArrayList<>(); + + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); + cfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false)); + + { + CacheConfiguration ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false); + + GridTestUtils.setMemoryMode(null, ccfg, GridTestUtils.TestMemoryMode.OFFHEAP_TIERED, 0, 0); + + cfgs.add(ccfg); + } + + final List putNodes = new ArrayList<>(); + + for (int i = 0; i < SRVS + CLIENTS - 1; i++) + putNodes.add(ignite(i)); + + final List getNodes = new ArrayList<>(); + + getNodes.add(ignite(SRVS + CLIENTS - 1)); + getNodes.add(ignite(0)); + + for (CacheConfiguration ccfg : cfgs) { + logCacheInfo(ccfg); + + getUpdateMultithreaded(ccfg, putNodes, getNodes, null, null); + + if (ccfg.getAtomicityMode() == TRANSACTIONAL) { + getUpdateMultithreaded(ccfg, putNodes, getNodes, PESSIMISTIC, REPEATABLE_READ); + + getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC, REPEATABLE_READ); + + getUpdateMultithreaded(ccfg, putNodes, getNodes, OPTIMISTIC, SERIALIZABLE); + } + } + } + + /** + * @param ccfg Cache configuration. + * @param putNodes Nodes executing updates. + * @param getNodes Nodes executing gets. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void getUpdateMultithreaded(CacheConfiguration ccfg, + final List putNodes, + final List getNodes, + final TransactionConcurrency concurrency, + final TransactionIsolation isolation) throws Exception { + log.info("Execute updates [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + final Ignite ignite0 = ignite(0); + + final String cacheName = ignite0.createCache(ccfg).getName(); + + try { + for (int i = 0; i < 5; i++) { + final Integer key = i; + + final AtomicInteger putThreadIdx = new AtomicInteger(); + final AtomicInteger getThreadIdx = new AtomicInteger(); + + final int PUT_THREADS = 20; + final int GET_THREAD = 20; + + final CyclicBarrier barrier = new CyclicBarrier(PUT_THREADS + GET_THREAD); + + final IgniteInternalFuture updateFut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() throws Exception { + int idx = putThreadIdx.getAndIncrement() % putNodes.size(); + + Ignite ignite = putNodes.get(idx); + + IgniteCache cache = ignite.cache(cacheName); + + IgniteTransactions txs = ignite.transactions(); + + Thread.currentThread().setName("update-thread-" + ignite.name()); + + barrier.await(); + + for (int i = 0; i < 100; i++) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + if (concurrency != null) { + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cache.put(key, rnd.nextInt()); + + tx.commit(); + } + catch (TransactionOptimisticException ignore) { + assertEquals(concurrency, OPTIMISTIC); + assertEquals(isolation, SERIALIZABLE); + } + } + else + cache.put(key, rnd.nextInt()); + } + + return null; + } + }, PUT_THREADS, "update-thread"); + + IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() throws Exception { + int idx = getThreadIdx.getAndIncrement() % getNodes.size(); + + Ignite ignite = getNodes.get(idx); + + IgniteCache cache; + + if (ignite.configuration().isClientMode()) + cache = ignite.createNearCache(cacheName, new NearCacheConfiguration()); + else + cache = ignite.cache(cacheName); + + Thread.currentThread().setName("get-thread-" + ignite.name()); + + barrier.await(); + + while (!updateFut.isDone()) + cache.get(key); + + return null; + } + }, GET_THREAD, "get-thread"); + + updateFut.get(); + getFut.get(); + + Integer val = (Integer)ignite0.cache(cacheName).get(key); + + log.info("Iteration [iter=" + i + ", val=" + val + ']'); + + for (Ignite getNode : getNodes) { + IgniteCache cache = getNode.cache(cacheName); + + if (getNode.configuration().isClientMode() || + cache.getConfiguration(CacheConfiguration.class).getNearConfiguration() != null) + assertNotNull(getNode.cache(cacheName).localPeek(key)); + } + + checkValue(key, val, cacheName); + + for (int n = 0; n < SRVS + CLIENTS; n++) { + val = n; + + ignite(n).cache(cacheName).put(key, val); + + checkValue(key, val, cacheName); + } + } + } + finally { + destroyCache(ignite0, cacheName); + } + } + + /** + * @param key Key. + * @param expVal Expected value. + * @param cacheName Cache name. + */ + private void checkValue(Object key, Object expVal, String cacheName) { + for (int i = 0; i < SRVS + CLIENTS; i++) { + IgniteCache cache = ignite(i).cache(cacheName); + + assertEquals(expVal, cache.get(key)); + } + } + + /** + * @param ccfg Cache configuration. + */ + private void logCacheInfo(CacheConfiguration ccfg) { + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", sync=" + ccfg.getWriteSynchronizationMode() + + ", backups=" + ccfg.getBackups() + + ", memMode=" + ccfg.getMemoryMode() + + ", near=" + (ccfg.getNearConfiguration() != null) + + ", store=" + ccfg.isWriteThrough() + + ", evictPlc=" + (ccfg.getEvictionPolicy() != null) + + ", swap=" + ccfg.isSwapEnabled() + + ", maxOffheap=" + ccfg.getOffHeapMaxMemory() + + ']'); + } + + /** + * @param ignite Node. + * @param cacheName Cache name. + */ + private void destroyCache(Ignite ignite, String cacheName) { + storeMap.clear(); + + ignite.destroyCache(cacheName); + + for (Ignite ignite0 : G.allGrids()) { + GridTestSwapSpaceSpi spi = (GridTestSwapSpaceSpi)ignite0.configuration().getSwapSpaceSpi(); + + spi.clearAll(); + } + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param storeEnabled If {@code true} adds cache store. + * @param nearCache If {@code true} near cache is enabled. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration( + CacheMode cacheMode, + CacheWriteSynchronizationMode syncMode, + int backups, + boolean storeEnabled, + boolean nearCache) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(syncMode); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + if (storeEnabled) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setWriteThrough(true); + ccfg.setReadThrough(true); + } + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new CacheStoreAdapter() { + @Override public Integer load(Integer key) throws CacheLoaderException { + return storeMap.get(key); + } + + @Override public void write(Cache.Entry entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + @Override public void delete(Object key) { + storeMap.remove(key); + } + }; + } + } +}